From 5369673d70a0f8c131ee4320cc5b9f02a81c20f1 Mon Sep 17 00:00:00 2001 From: wpleonardo Date: Tue, 15 Feb 2022 10:03:32 +0800 Subject: [PATCH 1/3] create buffer and memcpy it to the java buffer --- oap-ape/ape-native/src/reader.cc | 113 +++++++++++++++++++++++++++++-- oap-ape/ape-native/src/reader.h | 2 + 2 files changed, 110 insertions(+), 5 deletions(-) diff --git a/oap-ape/ape-native/src/reader.cc b/oap-ape/ape-native/src/reader.cc index a1073e2f8..a9756d5c7 100644 --- a/oap-ape/ape-native/src/reader.cc +++ b/oap-ape/ape-native/src/reader.cc @@ -17,11 +17,12 @@ #include #include - +#include #include "arrow/util/cpu_info.h" #undef NDEBUG #include +#include #include "src/reader.h" @@ -148,20 +149,63 @@ void convertBitMap(uint8_t* srcBitMap, uint8_t* dstByteMap, int len) { } int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr_) { + //if first batch + //if (totalRowsRead == 0) { + buffersPtrNew_ = new int64_t[sizeof(buffersPtr_)/sizeof(int64_t)]; + nullsPtrNew_ = new int64_t[sizeof(nullsPtr_)/sizeof(int64_t)]; + + for (int i = 0; i < initRequiredColumnCount; i++) { + std::cout <<"type: "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; + switch (fileMetaData->schema()->Column(requiredColumnIndex[i])->physical_type()) { + case parquet::Type::BOOLEAN: + (((buffersPtrNew_))[i]) = (int64_t) new bool[batchSize]; + break; + case parquet::Type::INT32: + (((buffersPtrNew_))[i]) = (int64_t) new int32_t[batchSize]; + break; + case parquet::Type::INT64: + (((buffersPtrNew_))[i]) = (int64_t) new int64_t[batchSize]; + break; + case parquet::Type::INT96: + (((buffersPtrNew_))[i]) = (int64_t) new parquet::Int96[batchSize]; + break; + case parquet::Type::FLOAT: + (((buffersPtrNew_))[i]) = (int64_t) new float[batchSize]; + break; + case parquet::Type::DOUBLE: + (((buffersPtrNew_))[i]) = (int64_t) new double[batchSize]; + break; + case parquet::Type::BYTE_ARRAY: + (((buffersPtrNew_))[i]) = (int64_t) new parquet::ByteArray[batchSize]; + // std::cout <<"type = BYTE_ARRAY ,bufferPtr size=" <= totalRows && dumpAggCursor == 0) { return -1; } checkEndOfRowGroup(); - + std::cout<< "checkEndOfRowGroup has done\n"; std::vector buffersPtr(initRequiredColumnCount); std::vector nullsPtr(initRequiredColumnCount); @@ -173,13 +217,15 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr buffersPtr[i] = buffersPtr_[usedInitBufferIndex[i]]; nullsPtr[i] = nullsPtr_[usedInitBufferIndex[i]]; } - + std::cout<< "loop1 has done\n"; allocateExtraBuffers(batchSize, buffersPtr, nullsPtr); - + std::cout<< "allocateExtraBuffers has done\n"; currentBatchSize = batchSize; int rowsRet = 0; if (aggExprs.size() == 0) { // will not do agg + std::cout<< "before read batch has done\n"; int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr); + std::cout<< "after read batch has done\n"; totalRowsRead += rowsToRead; ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead; rowsRet = doFilter(rowsToRead, buffersPtr, nullsPtr); @@ -241,6 +287,54 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr } } } +std::cout<< "before copy has done\n"; +//COPY THE BUFFER TO THE GIVEN BUFFER WHICH WILL BE RETURN TO THE JAVA PART + for (int i = 0; i < initRequiredColumnCount; i++) { + std::cout <<"type: "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; + switch (fileMetaData->schema()->Column(requiredColumnIndex[i])->physical_type()) { + case parquet::Type::BOOLEAN: + memcpy((void *)(((bool*)buffersPtrNew_[i])),(void*)(((bool*)buffersPtr_[i])),sizeof(((bool*)buffersPtrNew_[i]))); + delete((bool*)buffersPtrNew_[i]); + break; + case parquet::Type::INT32: + memcpy((void *)(((int32_t*)buffersPtrNew_[i])),(void*)(((int32_t*)buffersPtr_[i])),sizeof(((int32_t*)buffersPtrNew_[i]))); + delete((int32_t*)buffersPtrNew_[i]); + break; + case parquet::Type::INT64: + memcpy((void *)(((int64_t*)buffersPtrNew_[i])),(void*)(((int64_t*)buffersPtr_[i])),sizeof(((int64_t*)buffersPtrNew_[i]))); + delete((int64_t*)buffersPtrNew_[i]); + break; + case parquet::Type::INT96: + memcpy((void *)(((parquet::Int96*)buffersPtrNew_[i])),(void*)(((parquet::Int96*)buffersPtr_[i])),sizeof(((parquet::Int96*)buffersPtrNew_[i]))); + delete((parquet::Int96*)buffersPtrNew_[i]); + break; + case parquet::Type::FLOAT: + memcpy((void *)(((float*)buffersPtrNew_[i])),(void*)(((float*)buffersPtr_[i])),sizeof(((float*)buffersPtrNew_[i]))); + delete((float*)buffersPtrNew_[i]); + break; + case parquet::Type::DOUBLE: + memcpy((void *)(((double*)buffersPtrNew_[i])),(void*)(((double*)buffersPtr_[i])),sizeof(((double*)buffersPtrNew_[i]))); + delete((double*)buffersPtrNew_[i]); + break; + case parquet::Type::BYTE_ARRAY: + memcpy((void *)(((parquet::ByteArray*)buffersPtrNew_[i])),(void*)(((parquet::ByteArray*)buffersPtr_[i])),sizeof(((parquet::ByteArray*)buffersPtrNew_[i]))); + delete((parquet::ByteArray*)buffersPtrNew_[i]); + break; + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + memcpy((void *)(((parquet::FixedLenByteArray*)buffersPtrNew_[i])),(void*)(((parquet::FixedLenByteArray*)buffersPtr_[i])),sizeof(((parquet::FixedLenByteArray*)buffersPtrNew_[i]))); + delete((parquet::FixedLenByteArray*)buffersPtrNew_[i]); + break; + default: + ARROW_LOG(WARNING) << "Unsupported Type!"; + continue; + + } + + } + memcpy(nullsPtrNew_,nullsPtr_,sizeof(nullsPtrNew_)); + delete(nullsPtrNew_); + + ARROW_LOG(DEBUG) << "ret rows " << rowsRet; return rowsRet; @@ -253,12 +347,15 @@ int Reader::doReadBatch(int batchSize, std::vector& buffersPtr, std::vector repLevel(rowsToRead); std::vector nullBitMap(rowsToRead); ARROW_LOG(DEBUG) << "will read " << rowsToRead << " rows"; + std::cout<< "____349_____ \n"; for (int i = 0; i < columnReaders.size(); i++) { int64_t levelsRead = 0, valuesRead = 0, nullCount = 0; int rows = 0; int tmpRows = 0; + std::cout<< "____354_____ \n"; // ReadBatchSpaced API will return rows left in a data page while (rows < rowsToRead) { + std::cout<< "____357_____ \n"; // TODO: refactor. it's ugly, but didn't find some better way. switch (typeVector[i]) { case parquet::Type::BOOLEAN: { @@ -358,7 +455,9 @@ int Reader::doReadBatch(int batchSize, std::vector& buffersPtr, ARROW_LOG(WARNING) << "Unsupported Type!"; break; } + std::cout<< "____457_____ \n"; convertBitMap(nullBitMap.data(), (uint8_t*)nullsPtr[i] + rows, tmpRows); + std::cout<< "____459_____ \n"; rows += tmpRows; } assert(rowsToRead == rows); @@ -454,11 +553,15 @@ int Reader::dumpBufferAfterAgg(int groupBySize, int aggExprsSize, int Reader::allocateExtraBuffers(int batchSize, std::vector& buffersPtr, std::vector& nullsPtr) { if (filterExpression) { + std::cout<< "before allocateFilterBuffers has done\n"; allocateFilterBuffers(batchSize); + std::cout<< "allocateFilterBuffers has done\n"; } if (aggExprs.size()) { // todo: group by agg size + std::cout<< "before allocateAggBuffers has done\n"; allocateAggBuffers(batchSize); + std::cout<< "allocateAggBuffers has done\n"; } int filterBufferCount = filterDataBuffers.size(); diff --git a/oap-ape/ape-native/src/reader.h b/oap-ape/ape-native/src/reader.h index 17930ac86..8ddeba1a4 100644 --- a/oap-ape/ape-native/src/reader.h +++ b/oap-ape/ape-native/src/reader.h @@ -164,5 +164,7 @@ class Reader { ApeHashMap map; int32_t dumpAggCursor = 0; + int64_t * buffersPtrNew_; + int64_t * nullsPtrNew_; }; } // namespace ape From 6a278c7b4f5d0febceeea16632bb008a058ef4ee Mon Sep 17 00:00:00 2001 From: jiyu1021 Date: Tue, 1 Mar 2022 21:52:22 -0500 Subject: [PATCH 2/3] can run with buffer created in C++ --- oap-ape/ape-native/src/reader.cc | 251 +++++++++++++++++++++---------- oap-ape/ape-native/src/reader.h | 6 +- 2 files changed, 174 insertions(+), 83 deletions(-) diff --git a/oap-ape/ape-native/src/reader.cc b/oap-ape/ape-native/src/reader.cc index a9756d5c7..d21ea11c9 100644 --- a/oap-ape/ape-native/src/reader.cc +++ b/oap-ape/ape-native/src/reader.cc @@ -149,13 +149,15 @@ void convertBitMap(uint8_t* srcBitMap, uint8_t* dstByteMap, int len) { } int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr_) { + std::cout<< "initRequiredColumnCount is "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; + nullsPtrNew_[i] = (int64_t) new bool[batchSize]; switch (fileMetaData->schema()->Column(requiredColumnIndex[i])->physical_type()) { case parquet::Type::BOOLEAN: (((buffersPtrNew_))[i]) = (int64_t) new bool[batchSize]; @@ -177,7 +179,8 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr break; case parquet::Type::BYTE_ARRAY: (((buffersPtrNew_))[i]) = (int64_t) new parquet::ByteArray[batchSize]; - // std::cout <<"type = BYTE_ARRAY ,bufferPtr size=" < buffersPtr(initRequiredColumnCount); std::vector nullsPtr(initRequiredColumnCount); - + std::vector buffersPtrReal(initRequiredColumnCount); + std::vector nullsPtrReal(initRequiredColumnCount); // Not all input buffers can be used for column data loading. // espeically when agg pushing down is enabled. // E.g. input buffers could be in types of "tbl_col_a, sum(tbl_col_b)", // in which only the first buffer can be used for column data loading. for (int i = 0; i < usedInitBufferIndex.size(); i++) { - buffersPtr[i] = buffersPtr_[usedInitBufferIndex[i]]; - nullsPtr[i] = nullsPtr_[usedInitBufferIndex[i]]; + buffersPtr[i] = buffersPtrNew_[usedInitBufferIndex[i]]; + buffersPtrReal[i] = buffersPtr_[usedInitBufferIndex[i]]; + nullsPtr[i] = nullsPtrNew_[usedInitBufferIndex[i]]; + nullsPtrReal[i] = nullsPtr_[usedInitBufferIndex[i]]; } - std::cout<< "loop1 has done\n"; - allocateExtraBuffers(batchSize, buffersPtr, nullsPtr); - std::cout<< "allocateExtraBuffers has done\n"; + std::cout< nullVector(1); - results[i].nullVector = std::make_shared>(nullVector); - } - while (totalRowsRead < totalRows && !checkEndOfRowGroup()) { - int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr); - totalRowsRead += rowsToRead; - ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead; - - int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr); - ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter; - - int tmp = - doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr); - // if the last batch are empty after filter, it will return 0 regard less of the - // group num - if (tmp != 0) rowsRet = tmp; - } - int rowsDump = rowsRet; - if (rowsRet > batchSize) { - rowsDump = batchSize; - dumpAggCursor = batchSize; - } + + + + + } + // else { + + // if (dumpAggCursor == 0) { // will read a whole RowGroup and do agg + // results.resize(aggExprs.size()); + // for (int i = 0; i < aggExprs.size(); i++) { + // std::vector nullVector(1); + // results[i].nullVector = std::make_shared>(nullVector); + // } + // while (totalRowsRead < totalRows && !checkEndOfRowGroup()) { + // int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr); + // totalRowsRead += rowsToRead; + // ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead; + + // int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr); + // ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter; + + // int tmp = + // doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr); + // // if the last batch are empty after filter, it will return 0 regard less of the + // // group num + // if (tmp != 0) rowsRet = tmp; + // } + // int rowsDump = rowsRet; + // if (rowsRet > batchSize) { + // rowsDump = batchSize; + // dumpAggCursor = batchSize; + // } + + // if (aggExprs.size()) { + // dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, + // buffersPtr_, nullsPtr_, 0, rowsDump); + // } + // if (rowsRet <= + // batchSize) { // return all result in one call, so clear buffers here. + // map.clear(); + // keys.clear(); + // results.clear(); + // } + // rowsRet = rowsDump; + // } else { // this row group aggregation result is more than default batch size, we + // // will return them via mutilple call + // rowsRet = ((keys.size() - dumpAggCursor) > batchSize) + // ? batchSize + // : ((keys.size() - dumpAggCursor)); + // if (aggExprs.size()) { + // dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, + // buffersPtr_, nullsPtr_, dumpAggCursor, rowsRet); + // } + // if ((keys.size() - dumpAggCursor) <= + // batchSize) { // the last batch, let's clear buffers + // map.clear(); + // keys.clear(); + // results.clear(); + // dumpAggCursor = 0; + // } else { + // dumpAggCursor += batchSize; + // } + // } + // } + for (int i=0;i batchSize) - ? batchSize - : ((keys.size() - dumpAggCursor)); - if (aggExprs.size()) { - dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, - buffersPtr_, nullsPtr_, dumpAggCursor, rowsRet); - } - if ((keys.size() - dumpAggCursor) <= - batchSize) { // the last batch, let's clear buffers - map.clear(); - keys.clear(); - results.clear(); - dumpAggCursor = 0; - } else { - dumpAggCursor += batchSize; - } + switch (fileMetaData->schema()->Column(requiredColumnIndex[i])->physical_type()) { + + case parquet::Type::BOOLEAN: + memcpy((((bool*)(buffersPtrReal[i]))),(((bool*)(buffersPtr[i]))),batchSize*sizeof(bool)); + break; + case parquet::Type::INT32: + memcpy((((int32_t*)(buffersPtrReal[i]))),(((int32_t*)(buffersPtr[i]))),batchSize*sizeof(int32_t)); + break; + case parquet::Type::INT64: + memcpy((((int64_t*)(buffersPtrReal[i]))),(((int64_t*)(buffersPtr[i]))),batchSize*sizeof(int64_t)); + break; + case parquet::Type::INT96: + memcpy((((parquet::Int96*)(buffersPtrReal[i]))),(((parquet::Int96*)(buffersPtr[i]))),batchSize*sizeof(parquet::Int96)); + break; + case parquet::Type::FLOAT: + memcpy((((float*)(buffersPtrReal[i]))),(((float*)(buffersPtr[i]))),batchSize*sizeof(float)); + break; + case parquet::Type::DOUBLE: + memcpy((((double*)(buffersPtrReal[i]))),(((double*)(buffersPtr[i]))),batchSize*sizeof(double)); + break; + case parquet::Type::BYTE_ARRAY: + memcpy((((parquet::ByteArray*)(buffersPtrReal[i]))),(((parquet::ByteArray*)(buffersPtr[i]))),batchSize*sizeof(parquet::ByteArray)); + break; + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + memcpy((void *)(((parquet::FixedLenByteArray*)buffersPtr_[i])),(void*)(((parquet::FixedLenByteArray*)buffersPtrNew_[i])),batchSize*sizeof(parquet::FixedLenByteArray)); + delete((parquet::FixedLenByteArray*)buffersPtrNew_[i]); + memcpy((((parquet::FixedLenByteArray*)(buffersPtrReal[i]))),(((parquet::FixedLenByteArray*)(buffersPtr[i]))),batchSize*sizeof(parquet::FixedLenByteArray)); + break; + default: + ARROW_LOG(WARNING) << "Unsupported Type!"; + continue; + } + + } std::cout<< "before copy has done\n"; //COPY THE BUFFER TO THE GIVEN BUFFER WHICH WILL BE RETURN TO THE JAVA PART for (int i = 0; i < initRequiredColumnCount; i++) { - std::cout <<"type: "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; + std::cout <<"type: "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; + delete((bool*)nullsPtrNew_[i]); switch (fileMetaData->schema()->Column(requiredColumnIndex[i])->physical_type()) { + case parquet::Type::BOOLEAN: - memcpy((void *)(((bool*)buffersPtrNew_[i])),(void*)(((bool*)buffersPtr_[i])),sizeof(((bool*)buffersPtrNew_[i]))); delete((bool*)buffersPtrNew_[i]); break; case parquet::Type::INT32: - memcpy((void *)(((int32_t*)buffersPtrNew_[i])),(void*)(((int32_t*)buffersPtr_[i])),sizeof(((int32_t*)buffersPtrNew_[i]))); delete((int32_t*)buffersPtrNew_[i]); break; case parquet::Type::INT64: - memcpy((void *)(((int64_t*)buffersPtrNew_[i])),(void*)(((int64_t*)buffersPtr_[i])),sizeof(((int64_t*)buffersPtrNew_[i]))); delete((int64_t*)buffersPtrNew_[i]); break; case parquet::Type::INT96: - memcpy((void *)(((parquet::Int96*)buffersPtrNew_[i])),(void*)(((parquet::Int96*)buffersPtr_[i])),sizeof(((parquet::Int96*)buffersPtrNew_[i]))); delete((parquet::Int96*)buffersPtrNew_[i]); break; case parquet::Type::FLOAT: - memcpy((void *)(((float*)buffersPtrNew_[i])),(void*)(((float*)buffersPtr_[i])),sizeof(((float*)buffersPtrNew_[i]))); delete((float*)buffersPtrNew_[i]); break; case parquet::Type::DOUBLE: - memcpy((void *)(((double*)buffersPtrNew_[i])),(void*)(((double*)buffersPtr_[i])),sizeof(((double*)buffersPtrNew_[i]))); delete((double*)buffersPtrNew_[i]); break; case parquet::Type::BYTE_ARRAY: - memcpy((void *)(((parquet::ByteArray*)buffersPtrNew_[i])),(void*)(((parquet::ByteArray*)buffersPtr_[i])),sizeof(((parquet::ByteArray*)buffersPtrNew_[i]))); delete((parquet::ByteArray*)buffersPtrNew_[i]); break; case parquet::Type::FIXED_LEN_BYTE_ARRAY: - memcpy((void *)(((parquet::FixedLenByteArray*)buffersPtrNew_[i])),(void*)(((parquet::FixedLenByteArray*)buffersPtr_[i])),sizeof(((parquet::FixedLenByteArray*)buffersPtrNew_[i]))); delete((parquet::FixedLenByteArray*)buffersPtrNew_[i]); break; default: @@ -331,12 +393,13 @@ std::cout<< "before copy has done\n"; } } - memcpy(nullsPtrNew_,nullsPtr_,sizeof(nullsPtrNew_)); - delete(nullsPtrNew_); - + delete(buffersPtrNew_); + delete(nullsPtrNew_); +std::cout<< " copy has done, copy size = "<& buffersPtr, - std::vector& nullsPtr) { + std::vector& nullsPtr, std::vector& buffersPtrReal, + std::vector& nullsPtrReal) { if (filterExpression) { std::cout<< "before allocateFilterBuffers has done\n"; allocateFilterBuffers(batchSize); @@ -573,10 +637,14 @@ int Reader::allocateExtraBuffers(int batchSize, std::vector& buffersPtr buffersPtr.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); nullsPtr.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); + buffersPtrReal.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); + nullsPtrReal.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); for (int i = 0; i < filterBufferCount; i++) { buffersPtr[initRequiredColumnCount + i] = (int64_t)filterDataBuffers[i]; nullsPtr[initRequiredColumnCount + i] = (int64_t)filterNullBuffers[i]; + buffersPtrReal[initRequiredColumnCount + i] = (int64_t)filterDataBuffers[i]; + nullsPtrReal[initRequiredColumnCount + i] = (int64_t)filterNullBuffers[i]; } for (int i = 0; i < aggBufferCount; i++) { @@ -584,11 +652,15 @@ int Reader::allocateExtraBuffers(int batchSize, std::vector& buffersPtr (int64_t)aggDataBuffers[i]; nullsPtr[initRequiredColumnCount + filterBufferCount + i] = (int64_t)aggNullBuffers[i]; + buffersPtrReal[initRequiredColumnCount + filterBufferCount + i] =(int64_t)aggDataBuffers[i]; + nullsPtrReal[initRequiredColumnCount + filterBufferCount + i] =(int64_t)aggNullBuffers[i]; } } return initRequiredColumnCount + filterBufferCount + aggBufferCount; } + + bool Reader::hasNext() { return dumpAggCursor > 0 || columnReaders[0]->HasNext(); } bool Reader::skipNextRowGroup() { @@ -730,57 +802,74 @@ void Reader::setFilter(std::string filterJsonStr) { } int Reader::allocateFilterBuffers(int batchSize) { + std::cout<< "751\n"; if (!filterReset && batchSize <= currentBatchSize) { + std::cout<< "753\n"; return 0; } filterReset = false; - +std::cout<< "757\n"; // free current filter buffers freeFilterBuffers(); - +std::cout<< "760\n"; // allocate new filter buffers int extraBufferNum = 0; for (int i = initRequiredColumnCount; i < initPlusFilterRequiredColumnCount; i++) { + std::cout<< "764\n"; int columnIndex = requiredColumnIndex[i]; // allocate memory buffer char* dataBuffer; switch (fileMetaData->schema()->Column(columnIndex)->physical_type()) { case parquet::Type::BOOLEAN: dataBuffer = (char*)new bool[batchSize]; + std::cout<< "770\n"; break; case parquet::Type::INT32: dataBuffer = (char*)new int32_t[batchSize]; + std::cout<< "775\n"; break; case parquet::Type::INT64: dataBuffer = (char*)new int64_t[batchSize]; + std::cout<< "779\n"; break; case parquet::Type::INT96: dataBuffer = (char*)new parquet::Int96[batchSize]; + std::cout<< "783\n"; break; case parquet::Type::FLOAT: dataBuffer = (char*)new float[batchSize]; + std::cout<< "787\n"; break; case parquet::Type::DOUBLE: dataBuffer = (char*)new double[batchSize]; + std::cout<< "791\n"; break; case parquet::Type::BYTE_ARRAY: dataBuffer = (char*)new parquet::ByteArray[batchSize]; + std::cout<< "795\n"; break; case parquet::Type::FIXED_LEN_BYTE_ARRAY: dataBuffer = (char*)new parquet::FixedLenByteArray[batchSize]; + std::cout<< "799\n"; break; default: ARROW_LOG(WARNING) << "Unsupported Type!"; + std::cout<< "803\n"; continue; } char* nullBuffer = new char[batchSize]; + std::cout<< "808\n"; filterDataBuffers.push_back(dataBuffer); + std::cout<< "810\n"; filterNullBuffers.push_back(nullBuffer); + std::cout<< "812\n"; extraBufferNum++; + std::cout<< "815\n"; } ARROW_LOG(INFO) << "create extra filter buffers count: " << extraBufferNum; + std::cout<< "819\n"; return extraBufferNum; } diff --git a/oap-ape/ape-native/src/reader.h b/oap-ape/ape-native/src/reader.h index 8ddeba1a4..02ab6f8c1 100644 --- a/oap-ape/ape-native/src/reader.h +++ b/oap-ape/ape-native/src/reader.h @@ -94,7 +94,8 @@ class Reader { std::vector& nullsPtr); int allocateExtraBuffers(int batchSize, std::vector& buffersPtr, - std::vector& nullsPtr); + std::vector& nullsPtr, std::vector& buffersPtrReal, + std::vector& nullsPtrReal) ; int dumpBufferAfterAgg(int groupBySize, int aggExprsSize, const std::vector& keys, const std::vector& results, int64_t* oriBufferPtr, @@ -138,7 +139,8 @@ class Reader { std::vector filterColumnNames; std::vector filterDataBuffers; std::vector filterNullBuffers; - + std::vector filterDataBuffersReal; + std::vector filterNullBuffersReal; int initPlusFilterRequiredColumnCount = 0; bool aggReset = false; std::vector aggColumnNames; From ec0bf849ae52d5e61498a0364093d3c01392bc8f Mon Sep 17 00:00:00 2001 From: jiyu1021 Date: Fri, 4 Mar 2022 02:13:17 -0500 Subject: [PATCH 3/3] async done, next step enable IAA --- oap-ape/ape-native/src/reader.cc | 568 +++++++++++++++++++++---------- oap-ape/ape-native/src/reader.h | 9 +- 2 files changed, 400 insertions(+), 177 deletions(-) diff --git a/oap-ape/ape-native/src/reader.cc b/oap-ape/ape-native/src/reader.cc index d21ea11c9..aae91cdbb 100644 --- a/oap-ape/ape-native/src/reader.cc +++ b/oap-ape/ape-native/src/reader.cc @@ -18,6 +18,7 @@ #include #include #include +#include #include "arrow/util/cpu_info.h" #undef NDEBUG @@ -26,6 +27,8 @@ #include "src/reader.h" + + namespace ape { Reader::Reader() {} @@ -149,12 +152,119 @@ void convertBitMap(uint8_t* srcBitMap, uint8_t* dstByteMap, int len) { } int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr_) { - std::cout<< "initRequiredColumnCount is "<= totalRows && dumpAggCursor == 0) { + return -1; + } + std::cout<<"161currentRowGroup before check = "< buffersPtr(initRequiredColumnCount); + std::vector nullsPtr(initRequiredColumnCount); + for (int i = 0; i < usedInitBufferIndex.size(); i++) { + buffersPtr[i] = buffersPtr_[usedInitBufferIndex[i]]; + nullsPtr[i] = nullsPtr_[usedInitBufferIndex[i]]; + } + allocateExtraBuffersOrigin(batchSize, buffersPtr, nullsPtr); + currentBatchSize = batchSize; + int rowsRet = 0; + if (dumpAggCursor == 0) { // will read a whole RowGroup and do agg + results.resize(aggExprs.size()); + for (int i = 0; i < aggExprs.size(); i++) { + std::vector nullVector(1); + results[i].nullVector = std::make_shared>(nullVector); + } + while (totalRowsRead < totalRows && !checkEndOfRowGroup()) { + int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr); + totalRowsRead += rowsToRead; + ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead; + + int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr); + ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter; + + int tmp = + doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr); + // if the last batch are empty after filter, it will return 0 regard less of the + // group num + if (tmp != 0) rowsRet = tmp; + } + int rowsDump = rowsRet; + if (rowsRet > batchSize) { + rowsDump = batchSize; + dumpAggCursor = batchSize; + } + + if (aggExprs.size()) { + dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, + buffersPtr_, nullsPtr_, 0, rowsDump); + } + if (rowsRet <= + batchSize) { // return all result in one call, so clear buffers here. + map.clear(); + keys.clear(); + results.clear(); + } + rowsRet = rowsDump; + } else { // this row group aggregation result is more than default batch size, we + // will return them via mutilple call + rowsRet = ((keys.size() - dumpAggCursor) > batchSize) + ? batchSize + : ((keys.size() - dumpAggCursor)); + if (aggExprs.size()) { + dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, + buffersPtr_, nullsPtr_, dumpAggCursor, rowsRet); + } + if ((keys.size() - dumpAggCursor) <= + batchSize) { // the last batch, let's clear buffers + map.clear(); + keys.clear(); + results.clear(); + dumpAggCursor = 0; + } else { + dumpAggCursor += batchSize; + } + } + ARROW_LOG(DEBUG) << "ret rows " << rowsRet; + return rowsRet; + } else { +// no agg //if first batch - //if (totalRowsRead == 0) { - buffersPtrNew_ = new int64_t[initRequiredColumnCount]; - nullsPtrNew_ = new int64_t[initRequiredColumnCount]; - + if (totalRowsRead == 0){ + std::cout<<"this is the first BATCH!!!!!!!!!!!!!!!!!!!!!!!\n"; + preBufferRowGroups(); + initRowGroupReaders(); + if (totalRowsRead >= totalRows && dumpAggCursor == 0) { + std::cout<<"________________totalRowsRead = "< buffersPtr(initRequiredColumnCount); + std::vector nullsPtr(initRequiredColumnCount); + for (int i = 0; i < usedInitBufferIndex.size(); i++) { + buffersPtr[i] = buffersPtr_[usedInitBufferIndex[i]]; + nullsPtr[i] = nullsPtr_[usedInitBufferIndex[i]]; + } + std::cout<<"FIRST BATCH NORMAL ALLOCATE\n"; + allocateExtraBuffersOrigin(batchSize, buffersPtr, nullsPtr); + currentBatchSize = batchSize; + int rowsRet = 0; + int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr); + totalRowsRead += rowsToRead; + ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead; + rowsRet = doFilter(rowsToRead, buffersPtr, nullsPtr); + //read next batch + buffersPtrNew_ = new int64_t[initRequiredColumnCount]; + nullsPtrNew_ = new int64_t[initRequiredColumnCount]; + //prepare the buffer for (int i = 0; i < initRequiredColumnCount; i++) { std::cout <<"type: "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; nullsPtrNew_[i] = (int64_t) new bool[batchSize]; @@ -188,219 +298,290 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr break; default: ARROW_LOG(WARNING) << "Unsupported Type!"; - continue; - + continue; } - - } - //} - std::cout<< "create buffer has done\n"; - -// for (int i=0;i= totalRows && dumpAggCursor == 0) { - return -1; + std::cout<<"________________totalRowsRead = "<(initRequiredColumnCount); + nullsPtrNext=new std::vector(initRequiredColumnCount); + for (int i = 0; i < usedInitBufferIndex.size(); i++) { + (*buffersPtrNext)[i] = buffersPtrNew_[usedInitBufferIndex[i]]; + (*nullsPtrNext)[i] = nullsPtrNew_[usedInitBufferIndex[i]]; + } + std::cout<<"FIRST BATCH NEW ALLOCATE\n"; + int newLength = allocateExtraBuffersOrigin(batchSize, (*buffersPtrNext), (*nullsPtrNext)); + currentBatchSize = batchSize; + rowsToReadNext = doReadBatch(batchSize, (*buffersPtrNext), (*nullsPtrNext)); + totalRowsRead += rowsToReadNext; + std::cout<<"________________totalRowsRead = "< buffersPtr(initRequiredColumnCount); std::vector nullsPtr(initRequiredColumnCount); - std::vector buffersPtrReal(initRequiredColumnCount); - std::vector nullsPtrReal(initRequiredColumnCount); - // Not all input buffers can be used for column data loading. - // espeically when agg pushing down is enabled. - // E.g. input buffers could be in types of "tbl_col_a, sum(tbl_col_b)", - // in which only the first buffer can be used for column data loading. for (int i = 0; i < usedInitBufferIndex.size(); i++) { - buffersPtr[i] = buffersPtrNew_[usedInitBufferIndex[i]]; - buffersPtrReal[i] = buffersPtr_[usedInitBufferIndex[i]]; - nullsPtr[i] = nullsPtrNew_[usedInitBufferIndex[i]]; - nullsPtrReal[i] = nullsPtr_[usedInitBufferIndex[i]]; + buffersPtr[i] = buffersPtr_[usedInitBufferIndex[i]]; + nullsPtr[i] = nullsPtr_[usedInitBufferIndex[i]]; } - std::cout< nullVector(1); - // results[i].nullVector = std::make_shared>(nullVector); - // } - // while (totalRowsRead < totalRows && !checkEndOfRowGroup()) { - // int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr); - // totalRowsRead += rowsToRead; - // ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead; - - // int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr); - // ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter; - - // int tmp = - // doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr); - // // if the last batch are empty after filter, it will return 0 regard less of the - // // group num - // if (tmp != 0) rowsRet = tmp; - // } - // int rowsDump = rowsRet; - // if (rowsRet > batchSize) { - // rowsDump = batchSize; - // dumpAggCursor = batchSize; - // } - - // if (aggExprs.size()) { - // dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, - // buffersPtr_, nullsPtr_, 0, rowsDump); - // } - // if (rowsRet <= - // batchSize) { // return all result in one call, so clear buffers here. - // map.clear(); - // keys.clear(); - // results.clear(); - // } - // rowsRet = rowsDump; - // } else { // this row group aggregation result is more than default batch size, we - // // will return them via mutilple call - // rowsRet = ((keys.size() - dumpAggCursor) > batchSize) - // ? batchSize - // : ((keys.size() - dumpAggCursor)); - // if (aggExprs.size()) { - // dumpBufferAfterAgg(groupByExprs.size(), aggExprs.size(), keys, results, - // buffersPtr_, nullsPtr_, dumpAggCursor, rowsRet); - // } - // if ((keys.size() - dumpAggCursor) <= - // batchSize) { // the last batch, let's clear buffers - // map.clear(); - // keys.clear(); - // results.clear(); - // dumpAggCursor = 0; - // } else { - // dumpAggCursor += batchSize; - // } - // } - // } for (int i=0;ischema()->Column(requiredColumnIndex[i])->physical_type()) { case parquet::Type::BOOLEAN: - memcpy((((bool*)(buffersPtrReal[i]))),(((bool*)(buffersPtr[i]))),batchSize*sizeof(bool)); + memcpy((((bool*)(buffersPtr[i]))),(((bool*)((*buffersPtrNext)[i]))),batchSize*sizeof(bool)); break; case parquet::Type::INT32: - memcpy((((int32_t*)(buffersPtrReal[i]))),(((int32_t*)(buffersPtr[i]))),batchSize*sizeof(int32_t)); + memcpy((((int32_t*)(buffersPtr[i]))),(((int32_t*)((*buffersPtrNext)[i]))),batchSize*sizeof(int32_t)); break; case parquet::Type::INT64: - memcpy((((int64_t*)(buffersPtrReal[i]))),(((int64_t*)(buffersPtr[i]))),batchSize*sizeof(int64_t)); + memcpy((((int64_t*)(buffersPtr[i]))),(((int64_t*)((*buffersPtrNext)[i]))),batchSize*sizeof(int64_t)); break; case parquet::Type::INT96: - memcpy((((parquet::Int96*)(buffersPtrReal[i]))),(((parquet::Int96*)(buffersPtr[i]))),batchSize*sizeof(parquet::Int96)); + memcpy((((parquet::Int96*)(buffersPtr[i]))),(((parquet::Int96*)((*buffersPtrNext)[i]))),batchSize*sizeof(parquet::Int96)); break; case parquet::Type::FLOAT: - memcpy((((float*)(buffersPtrReal[i]))),(((float*)(buffersPtr[i]))),batchSize*sizeof(float)); + memcpy((((float*)(buffersPtr[i]))),(((float*)((*buffersPtrNext)[i]))),batchSize*sizeof(float)); break; case parquet::Type::DOUBLE: - memcpy((((double*)(buffersPtrReal[i]))),(((double*)(buffersPtr[i]))),batchSize*sizeof(double)); + memcpy((((double*)(buffersPtr[i]))),(((double*)((*buffersPtrNext)[i]))),batchSize*sizeof(double)); break; case parquet::Type::BYTE_ARRAY: - memcpy((((parquet::ByteArray*)(buffersPtrReal[i]))),(((parquet::ByteArray*)(buffersPtr[i]))),batchSize*sizeof(parquet::ByteArray)); + memcpy((((parquet::ByteArray*)(buffersPtr[i]))),(((parquet::ByteArray*)((*buffersPtrNext)[i]))),batchSize*sizeof(parquet::ByteArray)); break; case parquet::Type::FIXED_LEN_BYTE_ARRAY: - memcpy((void *)(((parquet::FixedLenByteArray*)buffersPtr_[i])),(void*)(((parquet::FixedLenByteArray*)buffersPtrNew_[i])),batchSize*sizeof(parquet::FixedLenByteArray)); - delete((parquet::FixedLenByteArray*)buffersPtrNew_[i]); - memcpy((((parquet::FixedLenByteArray*)(buffersPtrReal[i]))),(((parquet::FixedLenByteArray*)(buffersPtr[i]))),batchSize*sizeof(parquet::FixedLenByteArray)); + memcpy((((parquet::FixedLenByteArray*)(buffersPtr[i]))),(((parquet::FixedLenByteArray*)((*buffersPtrNext)[i]))),batchSize*sizeof(parquet::FixedLenByteArray)); break; default: ARROW_LOG(WARNING) << "Unsupported Type!"; continue; } - + } + //int rowsRet = doFilter(rowsToReadNext, buffersPtr,nullsPtr); + + preBufferRowGroups(); + initRowGroupReaders(); + if (totalRowsRead >= totalRows && dumpAggCursor == 0) { + int rowsRet = doFilter(rowsToReadNext, buffersPtr,nullsPtr); + std::cout<<"not the first batch but read finished \n"; + std::cout<<"________________totalRowsRead = "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; - delete((bool*)nullsPtrNew_[i]); - switch (fileMetaData->schema()->Column(requiredColumnIndex[i])->physical_type()) { - - case parquet::Type::BOOLEAN: - delete((bool*)buffersPtrNew_[i]); - break; - case parquet::Type::INT32: - delete((int32_t*)buffersPtrNew_[i]); - break; - case parquet::Type::INT64: - delete((int64_t*)buffersPtrNew_[i]); - break; - case parquet::Type::INT96: - delete((parquet::Int96*)buffersPtrNew_[i]); - break; - case parquet::Type::FLOAT: - delete((float*)buffersPtrNew_[i]); - break; - case parquet::Type::DOUBLE: - delete((double*)buffersPtrNew_[i]); - break; - case parquet::Type::BYTE_ARRAY: - delete((parquet::ByteArray*)buffersPtrNew_[i]); - break; - case parquet::Type::FIXED_LEN_BYTE_ARRAY: - delete((parquet::FixedLenByteArray*)buffersPtrNew_[i]); - break; - default: - ARROW_LOG(WARNING) << "Unsupported Type!"; - continue; - - } - + + } - delete(buffersPtrNew_); - delete(nullsPtrNew_); -std::cout<< " copy has done, copy size = "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; +// nullsPtrNew_[i] = (int64_t) new bool[batchSize]; +// switch (fileMetaData->schema()->Column(requiredColumnIndex[i])->physical_type()) { +// case parquet::Type::BOOLEAN: +// (((buffersPtrNew_))[i]) = (int64_t) new bool[batchSize]; +// break; +// case parquet::Type::INT32: +// (((buffersPtrNew_))[i]) = (int64_t) new int32_t[batchSize]; +// break; +// case parquet::Type::INT64: +// (((buffersPtrNew_))[i]) = (int64_t) new int64_t[batchSize]; +// break; +// case parquet::Type::INT96: +// (((buffersPtrNew_))[i]) = (int64_t) new parquet::Int96[batchSize]; +// break; +// case parquet::Type::FLOAT: +// (((buffersPtrNew_))[i]) = (int64_t) new float[batchSize]; +// break; +// case parquet::Type::DOUBLE: +// (((buffersPtrNew_))[i]) = (int64_t) new double[batchSize]; +// break; +// case parquet::Type::BYTE_ARRAY: +// (((buffersPtrNew_))[i]) = (int64_t) new parquet::ByteArray[batchSize]; +// std::cout <<"type size = "<= totalRows && dumpAggCursor == 0) { +// return -1; +// } +// checkEndOfRowGroup(); +// std::cout<< "checkEndOfRowGroup has done\n"; +// buffersPtrNext=new std::vector(initRequiredColumnCount); +// nullsPtrNext=new std::vector(initRequiredColumnCount); +// std::vector buffersPtrReal(initRequiredColumnCount); +// std::vector nullsPtrReal(initRequiredColumnCount); +// for (int i = 0; i < usedInitBufferIndex.size(); i++) { +// (*buffersPtrNext)[i] = buffersPtrNew_[usedInitBufferIndex[i]]; +// buffersPtrReal[i] = buffersPtr_[usedInitBufferIndex[i]]; +// (*nullsPtrNext)[i] = nullsPtrNew_[usedInitBufferIndex[i]]; +// nullsPtrReal[i] = nullsPtr_[usedInitBufferIndex[i]]; +// } +// std::cout<schema()->Column(requiredColumnIndex[i])->physical_type()) { + +// case parquet::Type::BOOLEAN: +// memcpy((((bool*)(buffersPtrReal[i]))),(((bool*)((*buffersPtrNext)[i]))),batchSize*sizeof(bool)); +// break; +// case parquet::Type::INT32: +// memcpy((((int32_t*)(buffersPtrReal[i]))),(((int32_t*)((*buffersPtrNext)[i]))),batchSize*sizeof(int32_t)); +// break; +// case parquet::Type::INT64: +// memcpy((((int64_t*)(buffersPtrReal[i]))),(((int64_t*)((*buffersPtrNext)[i]))),batchSize*sizeof(int64_t)); +// break; +// case parquet::Type::INT96: +// memcpy((((parquet::Int96*)(buffersPtrReal[i]))),(((parquet::Int96*)((*buffersPtrNext)[i]))),batchSize*sizeof(parquet::Int96)); +// break; +// case parquet::Type::FLOAT: +// memcpy((((float*)(buffersPtrReal[i]))),(((float*)((*buffersPtrNext)[i]))),batchSize*sizeof(float)); +// break; +// case parquet::Type::DOUBLE: +// memcpy((((double*)(buffersPtrReal[i]))),(((double*)((*buffersPtrNext)[i]))),batchSize*sizeof(double)); +// break; +// case parquet::Type::BYTE_ARRAY: +// memcpy((((parquet::ByteArray*)(buffersPtrReal[i]))),(((parquet::ByteArray*)((*buffersPtrNext)[i]))),batchSize*sizeof(parquet::ByteArray)); +// break; +// case parquet::Type::FIXED_LEN_BYTE_ARRAY: +// memcpy((((parquet::FixedLenByteArray*)(buffersPtrReal[i]))),(((parquet::FixedLenByteArray*)((*buffersPtrNext)[i]))),batchSize*sizeof(parquet::FixedLenByteArray)); +// break; +// default: +// ARROW_LOG(WARNING) << "Unsupported Type!"; +// continue; + +// } +// } +// std::cout<< "before copy has done\n"; +// //COPY THE BUFFER TO THE GIVEN BUFFER WHICH WILL BE RETURN TO THE JAVA PART +// for (int i = 0; i < initRequiredColumnCount; i++) { +// std::cout <<"type: "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; +// delete((bool*)nullsPtrNew_[i]); +// switch (fileMetaData->schema()->Column(requiredColumnIndex[i])->physical_type()) { + +// case parquet::Type::BOOLEAN: +// delete((bool*)buffersPtrNew_[i]); +// break; +// case parquet::Type::INT32: +// delete((int32_t*)buffersPtrNew_[i]); +// break; +// case parquet::Type::INT64: +// delete((int64_t*)buffersPtrNew_[i]); +// break; +// case parquet::Type::INT96: +// delete((parquet::Int96*)buffersPtrNew_[i]); +// break; +// case parquet::Type::FLOAT: +// delete((float*)buffersPtrNew_[i]); +// break; +// case parquet::Type::DOUBLE: +// delete((double*)buffersPtrNew_[i]); +// break; +// case parquet::Type::BYTE_ARRAY: +// delete((parquet::ByteArray*)buffersPtrNew_[i]); +// break; +// case parquet::Type::FIXED_LEN_BYTE_ARRAY: +// delete((parquet::FixedLenByteArray*)buffersPtrNew_[i]); +// break; +// default: +// ARROW_LOG(WARNING) << "Unsupported Type!"; +// continue; +// } +// } +// delete(buffersPtrNew_); +// delete(nullsPtrNew_); +// std::cout<< " copy has done, copy size = "<& buffersPtr, @@ -537,8 +718,10 @@ int Reader::doFilter(int batchSize, std::vector& buffersPtr, int rowsRet = filterExpression->ExecuteWithParam(batchSize, buffersPtr, nullsPtr, tmp); filterTime += std::chrono::steady_clock::now() - start; + multiThreadRowsRet=rowsRet; return rowsRet; } + multiThreadRowsRet=batchSize; return batchSize; } @@ -612,7 +795,40 @@ int Reader::dumpBufferAfterAgg(int groupBySize, int aggExprsSize, return 0; } +int Reader::allocateExtraBuffersOrigin(int batchSize, std::vector& buffersPtr, + std::vector& nullsPtr) { + if (filterExpression) { + allocateFilterBuffers(batchSize); + } + if (aggExprs.size()) { // todo: group by agg size + allocateAggBuffers(batchSize); + } + + int filterBufferCount = filterDataBuffers.size(); + int aggBufferCount = aggDataBuffers.size(); + + if (filterBufferCount > 0 || aggBufferCount > 0) { + ARROW_LOG(DEBUG) << "use extra filter buffers count: " << filterBufferCount + << "use extra agg buffers count: " << aggBufferCount; + + buffersPtr.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); + nullsPtr.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); + + for (int i = 0; i < filterBufferCount; i++) { + buffersPtr[initRequiredColumnCount + i] = (int64_t)filterDataBuffers[i]; + nullsPtr[initRequiredColumnCount + i] = (int64_t)filterNullBuffers[i]; + } + + for (int i = 0; i < aggBufferCount; i++) { + buffersPtr[initRequiredColumnCount + filterBufferCount + i] = + (int64_t)aggDataBuffers[i]; + nullsPtr[initRequiredColumnCount + filterBufferCount + i] = + (int64_t)aggNullBuffers[i]; + } + } + return initRequiredColumnCount + filterBufferCount + aggBufferCount; +} int Reader::allocateExtraBuffers(int batchSize, std::vector& buffersPtr, std::vector& nullsPtr, std::vector& buffersPtrReal, std::vector& nullsPtrReal) { diff --git a/oap-ape/ape-native/src/reader.h b/oap-ape/ape-native/src/reader.h index 02ab6f8c1..460393c4b 100644 --- a/oap-ape/ape-native/src/reader.h +++ b/oap-ape/ape-native/src/reader.h @@ -67,6 +67,7 @@ class Reader { void setPreBufferEnabled(bool isEnabled); static bool isNativeEnabled(); + private: void convertSchema(std::string requiredColumnName); @@ -96,7 +97,8 @@ class Reader { int allocateExtraBuffers(int batchSize, std::vector& buffersPtr, std::vector& nullsPtr, std::vector& buffersPtrReal, std::vector& nullsPtrReal) ; - + int allocateExtraBuffersOrigin(int batchSize, std::vector& buffersPtr, + std::vector& nullsPtr); int dumpBufferAfterAgg(int groupBySize, int aggExprsSize, const std::vector& keys, const std::vector& results, int64_t* oriBufferPtr, int64_t* oriNullsPtr, int32_t offset, int32_t length); @@ -168,5 +170,10 @@ class Reader { int32_t dumpAggCursor = 0; int64_t * buffersPtrNew_; int64_t * nullsPtrNew_; + std::vector *buffersPtrNext; + std::vector *nullsPtrNext; + int rowsToReadNext=0; + int haveRead = 1; + int multiThreadRowsRet =0; }; } // namespace ape