From 9507c7d588db5d75126a14f435e475d0c611e4ba Mon Sep 17 00:00:00 2001 From: hexueyu Date: Mon, 19 Aug 2024 15:27:17 +0800 Subject: [PATCH 1/2] HBase 1.x compatible --- .../com/alipay/oceanbase/hbase/OHTable.java | 79 +++++++++++++++---- .../alipay/oceanbase/hbase/OHTableClient.java | 23 ++++++ .../oceanbase/hbase/HTableTestBase.java | 32 +++++++- .../alipay/oceanbase/hbase/OHTableTest.java | 1 + 4 files changed, 120 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 1d76cf24..7ba1250e 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -26,6 +26,7 @@ import com.alipay.oceanbase.rpc.ObTableClient; import com.alipay.oceanbase.rpc.mutation.BatchOperation; import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult; +import com.alipay.oceanbase.rpc.property.Property; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; @@ -385,18 +386,34 @@ public HTableDescriptor getTableDescriptor() { */ @Override public boolean exists(Get get) throws IOException { + get.setCheckExistenceOnly(true); Result r = get(get); return !r.isEmpty(); } @Override public boolean[] existsAll(List list) throws IOException { - throw new FeatureNotSupportedException("not supported yet."); + if (list.isEmpty()) { + return new boolean[]{}; + } + if (list.size() == 1) { + return new boolean[]{exists(list.get(0))}; + } + Result[] r = get(list); + boolean[] ret = new boolean[r.length]; + for (int i = 0; i < r.length; ++i){ + ret[i] = !r[i].isEmpty(); + } + return ret; } - @Override public Boolean[] exists(List gets) throws IOException { - throw new FeatureNotSupportedException("not supported yet'"); + Boolean[] result = new Boolean[gets.size()]; + boolean[] exists = existsAll(gets); + for (int i = 0; i < gets.size(); ++i) { + result[i] = exists[i]; + } + return result; } @Override @@ -495,6 +512,14 @@ public Result call() throws IOException { obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(), true); +// if (get.isClosestRowBefore()) { +// obTableQuery = buildObTableQuery(filter, null, false, +// get.getRow(), true, 1); +// obTableQuery.setScanOrder(ObScanOrder.Reverse); +// } else { +// obTableQuery = buildObTableQuery(filter, get.getRow(), true, +// get.getRow(), true, -1); +// } request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString, Bytes.toString(family))); @@ -558,10 +583,24 @@ public ResultScanner call() throws IOException { if (scan.getFamilyMap().keySet().isEmpty()) { filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(), scan.getMaxVersions(), null); - obTableQuery = buildObTableQuery(filter, scan); - - request = buildObTableQueryAsyncRequest(obTableQuery, - getTargetTableName(tableNameString)); +// obTableQuery = buildObTableQuery(filter, scan); +// +// request = buildObTableQueryAsyncRequest(obTableQuery, +// getTargetTableName(tableNameString)); +// if (scan.isReversed()) { +// obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false, +// scan.getStartRow(), true, scan.getBatch()); +// } else { +// obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, +// scan.getStopRow(), false, scan.getBatch()); +// } +// if (scan.isReversed()) { // reverse scan 时设置为逆序 +// obTableQuery.setScanOrder(ObScanOrder.Reverse); +// } +// obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong( +// HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, +// HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE)); +// request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString)); clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient .execute(request); return new ClientStreamScanner(clientQueryAsyncStreamResult, @@ -573,6 +612,21 @@ public ResultScanner call() throws IOException { filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(), scan.getMaxVersions(), entry.getValue()); obTableQuery = buildObTableQuery(filter, scan); +// if (scan.isReversed()) { +// obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false, +// scan.getStartRow(), true, scan.getBatch()); +// } else { +// obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, +// scan.getStopRow(), false, scan.getBatch()); +// } +// if (scan.isReversed()) { // reverse scan 时设置为逆序 +// obTableQuery.setScanOrder(ObScanOrder.Reverse); +// } +// +// // no support set maxResultSize. +// obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong( +// HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, +// HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE)); request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString, Bytes.toString(family))); @@ -1202,22 +1256,19 @@ public void setOperationTimeout(int operationTimeout) { (this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); } - // todo @Override public int getOperationTimeout() { - throw new FeatureNotSupportedException("not supported yet."); + return operationTimeout; } - //todo + // rpcTimeout means server max execute time, equal Table API rpc_execute_time, it must be set before OHTable init; please pass this parameter through conf @Override - public void setRpcTimeout(int i) { - throw new FeatureNotSupportedException("not supported yet."); + public void setRpcTimeout(int rpcTimeout) { } - // todo @Override public int getRpcTimeout() { - throw new FeatureNotSupportedException("not supported yet."); + return Integer.parseInt(configuration.get(Property.RPC_EXECUTE_TIMEOUT.getKey())); } public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) { diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java b/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java index a1d84329..cc66c24f 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java @@ -19,6 +19,7 @@ import com.alipay.oceanbase.hbase.core.Lifecycle; import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; +import com.alipay.oceanbase.rpc.property.Property; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.Service; @@ -228,6 +229,28 @@ public int getRpcTimeout() { return ohTable.getRpcTimeout(); } + @Override + public void setOperationTimeout(int operationTimeout) { + checkStatus(); + ohTable.setOperationTimeout(operationTimeout); + } + + @Override + public int getOperationTimeout() { + checkStatus(); + return ohTable.getOperationTimeout(); + } + + @Override + public void setRpcTimeout(int rpcTimeout) { + conf.set(Property.RPC_EXECUTE_TIMEOUT.getKey(), String.valueOf(rpcTimeout)); + } + + @Override + public int getRpcTimeout() { + return Integer.parseInt(conf.get(Property.RPC_EXECUTE_TIMEOUT.getKey())); + } + @Override public byte[] getTableName() { return tableName; diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index fed34c76..27b2cd62 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -2222,11 +2222,23 @@ public void testCheckAndPut() throws IOException, InterruptedException { boolean ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(), value.getBytes(), put); Assert.assertTrue(ret); + ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(), + CompareFilter.CompareOp.GREATER, "value1".getBytes(), put); + Assert.assertFalse(ret); + ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(), + CompareFilter.CompareOp.GREATER_OR_EQUAL, "value1".getBytes(), put); + Assert.assertTrue(ret); + ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(), + CompareFilter.CompareOp.LESS, "".getBytes(), put); + Assert.assertFalse(ret); + ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(), + CompareFilter.CompareOp.LESS_OR_EQUAL, "".getBytes(), put); + Assert.assertFalse(ret); get = new Get(key.getBytes()); get.setMaxVersions(Integer.MAX_VALUE); get.addColumn(family.getBytes(), column.getBytes()); r = hTable.get(get); - Assert.assertEquals(2, r.raw().length); + Assert.assertEquals(3, r.raw().length); Assert.assertEquals("value1", Bytes.toString(r.raw()[0].getValue())); } @@ -2251,6 +2263,24 @@ public void testCheckAndDelete() throws IOException { boolean ret = hTable.checkAndDelete(key.getBytes(), family.getBytes(), column.getBytes(), value.getBytes(), delete); Assert.assertTrue(ret); + put.add(family.getBytes(), column.getBytes(), "value6".getBytes()); + hTable.put(put); + ret = hTable.checkAndDelete(key.getBytes(), "family1".getBytes(), column.getBytes(), + CompareFilter.CompareOp.GREATER, "value5".getBytes(), delete); + Assert.assertTrue(ret); + put.add(family.getBytes(), column.getBytes(), "value5".getBytes()); + hTable.put(put); + ret = hTable.checkAndDelete(key.getBytes(), "family1".getBytes(), column.getBytes(), + CompareFilter.CompareOp.GREATER_OR_EQUAL, "value5".getBytes(), delete); + Assert.assertTrue(ret); + put.add(family.getBytes(), column.getBytes(), "value1".getBytes()); + hTable.put(put); + ret = hTable.checkAndDelete(key.getBytes(), "family1".getBytes(), column.getBytes(), + CompareFilter.CompareOp.LESS, "value1".getBytes(), delete); + Assert.assertFalse(ret); + ret = hTable.checkAndDelete(key.getBytes(), "family1".getBytes(), column.getBytes(), + CompareFilter.CompareOp.LESS_OR_EQUAL, "value1".getBytes(), delete); + Assert.assertTrue(ret); Get get = new Get(key.getBytes()); get.setMaxVersions(Integer.MAX_VALUE); get.addColumn(family.getBytes(), column.getBytes()); diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java index b14ca220..dade1ec1 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java @@ -19,6 +19,7 @@ import com.alipay.oceanbase.rpc.ObTableClient; import com.alipay.oceanbase.rpc.exception.ObTableNotExistException; +import com.alipay.oceanbase.rpc.property.Property; import com.alipay.sofa.common.thread.SofaThreadPoolExecutor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTableInterface; From ccc6123c892a85cd4baef1346fef3cd969c8e1cd Mon Sep 17 00:00:00 2001 From: hexueyu Date: Fri, 23 Aug 2024 17:30:19 +0800 Subject: [PATCH 2/2] add ObParams --- .../com/alipay/oceanbase/hbase/OHTable.java | 106 ++++++++++-------- .../alipay/oceanbase/hbase/OHTableClient.java | 22 ---- .../hbase/result/ClientStreamScanner.java | 5 +- .../oceanbase/hbase/HTableTestBase.java | 105 +++++++++++++++++ 4 files changed, 167 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index 7ba1250e..7dd2f1af 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -24,6 +24,9 @@ import com.alipay.oceanbase.hbase.result.ClientStreamScanner; import com.alipay.oceanbase.hbase.util.*; import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.table.ObHBaseParams; +import com.alipay.oceanbase.rpc.table.ObKVParamsBase; +import com.alipay.oceanbase.rpc.table.ObKVParams; import com.alipay.oceanbase.rpc.mutation.BatchOperation; import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult; import com.alipay.oceanbase.rpc.property.Property; @@ -166,6 +169,8 @@ public class OHTable implements HTableInterface { */ private final Configuration configuration; + private int scannerTimeout; + /** * Creates an object to access a HBase table. * Shares oceanbase table obTableClient and other resources with other OHTable instances @@ -189,6 +194,9 @@ public OHTable(Configuration configuration, String tableName) throws IOException DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX); long keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME, DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME); + HBaseConfiguration.getInt(configuration, HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime); this.obTableClient = ObTableClientManager .getOrCreateObTableClient(new OHConnectionConfiguration(configuration)); @@ -388,7 +396,7 @@ public HTableDescriptor getTableDescriptor() { public boolean exists(Get get) throws IOException { get.setCheckExistenceOnly(true); Result r = get(get); - return !r.isEmpty(); + return r.getExists(); } @Override @@ -401,8 +409,8 @@ public boolean[] existsAll(List list) throws IOException { } Result[] r = get(list); boolean[] ret = new boolean[r.length]; - for (int i = 0; i < r.length; ++i){ - ret[i] = !r[i].isEmpty(); + for (int i = 0; i < list.size(); ++i) { + ret[i] = exists(list.get(i)); } return ret; } @@ -497,11 +505,16 @@ public Result call() throws IOException { get.getMaxVersions(), null); obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(), true); - request = buildObTableQueryRequest(obTableQuery, - getTargetTableName(tableNameString)); + obTableQuery.setObKVParams(buildObHBaseParams(null, get)); + request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString)); clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient .execute(request); + if (get.isCheckExistenceOnly() ) { + Result result = new Result(); + result.setExists(clientQueryStreamResult.getCacheRows().size() != 0); + return result; + } getKeyValueFromResult(clientQueryStreamResult, keyValueList, true, family); } else { for (Map.Entry> entry : get.getFamilyMap() @@ -512,19 +525,25 @@ public Result call() throws IOException { obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(), true); -// if (get.isClosestRowBefore()) { -// obTableQuery = buildObTableQuery(filter, null, false, -// get.getRow(), true, 1); -// obTableQuery.setScanOrder(ObScanOrder.Reverse); -// } else { -// obTableQuery = buildObTableQuery(filter, get.getRow(), true, -// get.getRow(), true, -1); -// } - + if (get.isClosestRowBefore()) { + obTableQuery = buildObTableQuery(filter, null, false, + get.getRow(), true); + obTableQuery.setScanOrder(ObScanOrder.Reverse); + } else { + obTableQuery = buildObTableQuery(filter, get.getRow(), true, + get.getRow(), true); + } + + obTableQuery.setObKVParams(buildObHBaseParams(null, get)); request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString, Bytes.toString(family))); clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient .execute(request); + if (get.isCheckExistenceOnly() ) { + Result result = new Result(); + result.setExists(clientQueryStreamResult.getCacheRows().size() != 0); + return result; + } getKeyValueFromResult(clientQueryStreamResult, keyValueList, false, family); } @@ -583,24 +602,10 @@ public ResultScanner call() throws IOException { if (scan.getFamilyMap().keySet().isEmpty()) { filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(), scan.getMaxVersions(), null); -// obTableQuery = buildObTableQuery(filter, scan); -// -// request = buildObTableQueryAsyncRequest(obTableQuery, -// getTargetTableName(tableNameString)); -// if (scan.isReversed()) { -// obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false, -// scan.getStartRow(), true, scan.getBatch()); -// } else { -// obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, -// scan.getStopRow(), false, scan.getBatch()); -// } -// if (scan.isReversed()) { // reverse scan 时设置为逆序 -// obTableQuery.setScanOrder(ObScanOrder.Reverse); -// } -// obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong( -// HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, -// HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE)); -// request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString)); + obTableQuery = buildObTableQuery(filter, scan); + + request = buildObTableQueryAsyncRequest(obTableQuery, + getTargetTableName(tableNameString)); clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient .execute(request); return new ClientStreamScanner(clientQueryAsyncStreamResult, @@ -612,22 +617,6 @@ public ResultScanner call() throws IOException { filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(), scan.getMaxVersions(), entry.getValue()); obTableQuery = buildObTableQuery(filter, scan); -// if (scan.isReversed()) { -// obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false, -// scan.getStartRow(), true, scan.getBatch()); -// } else { -// obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true, -// scan.getStopRow(), false, scan.getBatch()); -// } -// if (scan.isReversed()) { // reverse scan 时设置为逆序 -// obTableQuery.setScanOrder(ObScanOrder.Reverse); -// } -// -// // no support set maxResultSize. -// obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong( -// HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, -// HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE)); - request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString, Bytes.toString(family))); clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient @@ -649,6 +638,23 @@ public ResultScanner call() throws IOException { return executeServerCallable(serverCallable); } + public ObKVParams buildObHBaseParams(Scan scan, Get get) { + ObKVParams obKVParams = new ObKVParams(); + ObHBaseParams obHBaseParams = new ObHBaseParams(); + if (scan != null) { + obHBaseParams.setCaching(scan.getCaching()); + obHBaseParams.setCallTimeout(scannerTimeout); + obHBaseParams.setCacheBlock(scan.isGetScan()); + obHBaseParams.setAllowPartialResults(scan.getAllowPartialResults()); + } + if (get != null) { + obHBaseParams.setCheckExistenceOnly(get.isCheckExistenceOnly()); + obHBaseParams.setCacheBlock(get.getCacheBlocks()); + } + obKVParams.setObParamsBase(obHBaseParams); + return obKVParams; + } + @Override public ResultScanner getScanner(final byte[] family) throws IOException { Scan scan = new Scan(); @@ -1452,6 +1458,10 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) { if (scan.getBatch() > 0) { obTableQuery.setBatchSize(scan.getBatch()); } + obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : configuration.getLong( + HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE)); + obTableQuery.setObKVParams(buildObHBaseParams(scan, null)); return obTableQuery; } diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java b/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java index cc66c24f..6445025b 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java @@ -229,28 +229,6 @@ public int getRpcTimeout() { return ohTable.getRpcTimeout(); } - @Override - public void setOperationTimeout(int operationTimeout) { - checkStatus(); - ohTable.setOperationTimeout(operationTimeout); - } - - @Override - public int getOperationTimeout() { - checkStatus(); - return ohTable.getOperationTimeout(); - } - - @Override - public void setRpcTimeout(int rpcTimeout) { - conf.set(Property.RPC_EXECUTE_TIMEOUT.getKey(), String.valueOf(rpcTimeout)); - } - - @Override - public int getRpcTimeout() { - return Integer.parseInt(conf.get(Property.RPC_EXECUTE_TIMEOUT.getKey())); - } - @Override public byte[] getTableName() { return tableName; diff --git a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java index 35096378..470b2a25 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java +++ b/src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java @@ -109,7 +109,9 @@ public Result next() throws IOException { List keyValues = new ArrayList(); keyValues.add(startKeyValue); - while (streamNext = streamResult.next()) { + int size = streamResult.getCacheRows().size(); + int current = 0; + while (streamNext = streamResult.next() && current < size){ List row = streamResult.getRow(); if (this.isTableGroup) { // split family and qualifier @@ -126,6 +128,7 @@ public Result next() throws IOException { if (Arrays.equals(sk, k)) { // when rowKey is equal to the previous rowKey ,merge the result into the same result keyValues.add(new KeyValue(k, family, q, t, v)); + current++; } else { break; } diff --git a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java index 27b2cd62..5265d4dd 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java +++ b/src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java @@ -18,6 +18,8 @@ package com.alipay.oceanbase.hbase; import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; + +import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.AbstractQueryStreamResult; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; @@ -50,6 +52,109 @@ public abstract class HTableTestBase { protected Table hTable; + @Test + public void testScanWithObParams() throws Exception { + String key1 = "scanKey1x"; + String key2 = "scanKey2x"; + String key3 = "scanKey3x"; + String key4 = "scanKey4x"; + String column1 = "column1"; + String column2 = "column2"; + String value1 = "value1"; + String value2 = "value2"; + String family = "family1"; + + // delete previous data + Delete deleteKey1Family = new Delete(toBytes(key1)); + deleteKey1Family.deleteFamily(toBytes(family)); + Delete deleteKey2Family = new Delete(toBytes(key2)); + deleteKey2Family.deleteFamily(toBytes(family)); + Delete deleteKey3Family = new Delete(toBytes(key3)); + deleteKey3Family.deleteFamily(toBytes(family)); + Delete deleteKey4Family = new Delete(toBytes(key4)); + deleteKey4Family.deleteFamily(toBytes(family)); + + hTable.delete(deleteKey1Family); + hTable.delete(deleteKey2Family); + hTable.delete(deleteKey3Family); + hTable.delete(deleteKey4Family); + + Put putKey1Column1Value1 = new Put(toBytes(key1)); + putKey1Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1)); + + Put putKey1Column1Value2 = new Put(toBytes(key1)); + putKey1Column1Value2.add(toBytes(family), toBytes(column1), toBytes(value2)); + + Put putKey1Column2Value2 = new Put(toBytes(key1)); + putKey1Column2Value2.add(toBytes(family), toBytes(column2), toBytes(value2)); + + Put putKey1Column2Value1 = new Put(toBytes(key1)); + putKey1Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1)); + + Put putKey2Column1Value1 = new Put(toBytes(key2)); + putKey2Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1)); + + Put putKey2Column1Value2 = new Put(toBytes(key2)); + putKey2Column1Value2.add(toBytes(family), toBytes(column1), toBytes(value2)); + + Put putKey2Column2Value2 = new Put(toBytes(key2)); + putKey2Column2Value2.add(toBytes(family), toBytes(column2), toBytes(value2)); + + Put putKey2Column2Value1 = new Put(toBytes(key2)); + putKey2Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1)); + + Put putKey3Column1Value1 = new Put(toBytes(key3)); + putKey3Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1)); + + Put putKey4Column1Value1 = new Put(toBytes(key4)); + putKey4Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1)); + + tryPut(hTable, putKey1Column1Value1); + tryPut(hTable, putKey1Column1Value2); + tryPut(hTable, putKey1Column1Value1); // 2 * putKey1Column1Value1 + tryPut(hTable, putKey1Column2Value1); + tryPut(hTable, putKey1Column2Value2); + tryPut(hTable, putKey1Column2Value1); // 2 * putKey1Column2Value1 + tryPut(hTable, putKey1Column2Value2); // 2 * putKey1Column2Value2 + tryPut(hTable, putKey2Column2Value1); + tryPut(hTable, putKey2Column2Value2); + tryPut(hTable, putKey3Column1Value1); + tryPut(hTable, putKey4Column1Value1); + + Scan scan; + + scan = new Scan(); + scan.addFamily(family.getBytes()); + scan.setStartRow("scanKey1x".getBytes()); + scan.setStopRow("scanKey5x".getBytes()); + scan.setMaxVersions(10); + scan.setCaching(1); + scan.setBatch(3); + ResultScanner scanner = hTable.getScanner(scan); + Result result = scanner.next(); + Assert.assertEquals(3, result.size()); + scanner.close(); + + scan.setMaxResultSize(10); + scan.setBatch(-1); + ResultScanner scanner1 = hTable.getScanner(scan); + result = scanner1.next(); + Assert.assertEquals(7, result.size()); // 返回第一行全部数据,因为不允许行内部分返回 + + scanner1.close(); + + scan.setAllowPartialResults(true); + ResultScanner scanner2 = hTable.getScanner(scan); + result = scanner2.next(); + Assert.assertEquals(1, result.size()); + + + hTable.delete(deleteKey1Family); + hTable.delete(deleteKey2Family); + hTable.delete(deleteKey3Family); + hTable.delete(deleteKey4Family); + } + @Test public void testTableGroup() throws IOError, IOException { /*