diff --git a/oap-ape/ape-native/src/reader.cc b/oap-ape/ape-native/src/reader.cc index a1073e2f8..aae91cdbb 100644 --- a/oap-ape/ape-native/src/reader.cc +++ b/oap-ape/ape-native/src/reader.cc @@ -17,14 +17,18 @@ #include #include - +#include +#include #include "arrow/util/cpu_info.h" #undef NDEBUG #include +#include #include "src/reader.h" + + namespace ape { Reader::Reader() {} @@ -148,43 +152,29 @@ void convertBitMap(uint8_t* srcBitMap, uint8_t* dstByteMap, int len) { } int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr_) { - // Pre buffer row groups. - // This is not called in `init` because `requiredColumnIndex` - // may be changed by `setFilter` after `init`. - preBufferRowGroups(); - // Init grow group readers. - // This should be called after preBufferRowGroups - initRowGroupReaders(); - - // this reader have read all rows +std::cout<<"NEW BATCH!!!!!!!!!!!!!!!!!!!!!!!\n"; +std::cout<<"aggre = "<= totalRows && dumpAggCursor == 0) { return -1; } + std::cout<<"161currentRowGroup before check = "< buffersPtr(initRequiredColumnCount); std::vector nullsPtr(initRequiredColumnCount); - - // Not all input buffers can be used for column data loading. - // espeically when agg pushing down is enabled. - // E.g. input buffers could be in types of "tbl_col_a, sum(tbl_col_b)", - // in which only the first buffer can be used for column data loading. for (int i = 0; i < usedInitBufferIndex.size(); i++) { buffersPtr[i] = buffersPtr_[usedInitBufferIndex[i]]; nullsPtr[i] = nullsPtr_[usedInitBufferIndex[i]]; } - - allocateExtraBuffers(batchSize, buffersPtr, nullsPtr); - + allocateExtraBuffersOrigin(batchSize, buffersPtr, nullsPtr); currentBatchSize = batchSize; int rowsRet = 0; - if (aggExprs.size() == 0) { // will not do agg - int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr); - totalRowsRead += rowsToRead; - ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead; - rowsRet = doFilter(rowsToRead, buffersPtr, nullsPtr); - } else { - if (dumpAggCursor == 0) { // will read a whole RowGroup and do agg + if (dumpAggCursor == 0) { // will read a whole RowGroup and do agg results.resize(aggExprs.size()); for (int i = 0; i < aggExprs.size(); i++) { std::vector nullVector(1); @@ -240,10 +230,358 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr dumpAggCursor += batchSize; } } + ARROW_LOG(DEBUG) << "ret rows " << rowsRet; + return rowsRet; + } else { +// no agg + //if first batch + if (totalRowsRead == 0){ + std::cout<<"this is the first BATCH!!!!!!!!!!!!!!!!!!!!!!!\n"; + preBufferRowGroups(); + initRowGroupReaders(); + if (totalRowsRead >= totalRows && dumpAggCursor == 0) { + std::cout<<"________________totalRowsRead = "< buffersPtr(initRequiredColumnCount); + std::vector nullsPtr(initRequiredColumnCount); + for (int i = 0; i < usedInitBufferIndex.size(); i++) { + buffersPtr[i] = buffersPtr_[usedInitBufferIndex[i]]; + nullsPtr[i] = nullsPtr_[usedInitBufferIndex[i]]; + } + std::cout<<"FIRST BATCH NORMAL ALLOCATE\n"; + allocateExtraBuffersOrigin(batchSize, buffersPtr, nullsPtr); + currentBatchSize = batchSize; + int rowsRet = 0; + int rowsToRead = doReadBatch(batchSize, buffersPtr, nullsPtr); + totalRowsRead += rowsToRead; + ARROW_LOG(DEBUG) << "total rows read yet: " << totalRowsRead; + rowsRet = doFilter(rowsToRead, buffersPtr, nullsPtr); + //read next batch + buffersPtrNew_ = new int64_t[initRequiredColumnCount]; + nullsPtrNew_ = new int64_t[initRequiredColumnCount]; + //prepare the buffer + for (int i = 0; i < initRequiredColumnCount; i++) { + std::cout <<"type: "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; + nullsPtrNew_[i] = (int64_t) new bool[batchSize]; + switch (fileMetaData->schema()->Column(requiredColumnIndex[i])->physical_type()) { + case parquet::Type::BOOLEAN: + (((buffersPtrNew_))[i]) = (int64_t) new bool[batchSize]; + break; + case parquet::Type::INT32: + (((buffersPtrNew_))[i]) = (int64_t) new int32_t[batchSize]; + break; + case parquet::Type::INT64: + (((buffersPtrNew_))[i]) = (int64_t) new int64_t[batchSize]; + break; + case parquet::Type::INT96: + (((buffersPtrNew_))[i]) = (int64_t) new parquet::Int96[batchSize]; + break; + case parquet::Type::FLOAT: + (((buffersPtrNew_))[i]) = (int64_t) new float[batchSize]; + break; + case parquet::Type::DOUBLE: + (((buffersPtrNew_))[i]) = (int64_t) new double[batchSize]; + break; + case parquet::Type::BYTE_ARRAY: + (((buffersPtrNew_))[i]) = (int64_t) new parquet::ByteArray[batchSize]; + std::cout <<"type size = "<= totalRows && dumpAggCursor == 0) { + std::cout<<"________________totalRowsRead = "<(initRequiredColumnCount); + nullsPtrNext=new std::vector(initRequiredColumnCount); + for (int i = 0; i < usedInitBufferIndex.size(); i++) { + (*buffersPtrNext)[i] = buffersPtrNew_[usedInitBufferIndex[i]]; + (*nullsPtrNext)[i] = nullsPtrNew_[usedInitBufferIndex[i]]; + } + std::cout<<"FIRST BATCH NEW ALLOCATE\n"; + int newLength = allocateExtraBuffersOrigin(batchSize, (*buffersPtrNext), (*nullsPtrNext)); + currentBatchSize = batchSize; + rowsToReadNext = doReadBatch(batchSize, (*buffersPtrNext), (*nullsPtrNext)); + totalRowsRead += rowsToReadNext; + std::cout<<"________________totalRowsRead = "< buffersPtr(initRequiredColumnCount); + std::vector nullsPtr(initRequiredColumnCount); + for (int i = 0; i < usedInitBufferIndex.size(); i++) { + buffersPtr[i] = buffersPtr_[usedInitBufferIndex[i]]; + nullsPtr[i] = nullsPtr_[usedInitBufferIndex[i]]; + } + int newLength = allocateExtraBuffersOrigin(batchSize, buffersPtr, nullsPtr); + currentBatchSize = batchSize; + for (int i=0;ischema()->Column(requiredColumnIndex[i])->physical_type()) { + + case parquet::Type::BOOLEAN: + memcpy((((bool*)(buffersPtr[i]))),(((bool*)((*buffersPtrNext)[i]))),batchSize*sizeof(bool)); + break; + case parquet::Type::INT32: + memcpy((((int32_t*)(buffersPtr[i]))),(((int32_t*)((*buffersPtrNext)[i]))),batchSize*sizeof(int32_t)); + break; + case parquet::Type::INT64: + memcpy((((int64_t*)(buffersPtr[i]))),(((int64_t*)((*buffersPtrNext)[i]))),batchSize*sizeof(int64_t)); + break; + case parquet::Type::INT96: + memcpy((((parquet::Int96*)(buffersPtr[i]))),(((parquet::Int96*)((*buffersPtrNext)[i]))),batchSize*sizeof(parquet::Int96)); + break; + case parquet::Type::FLOAT: + memcpy((((float*)(buffersPtr[i]))),(((float*)((*buffersPtrNext)[i]))),batchSize*sizeof(float)); + break; + case parquet::Type::DOUBLE: + memcpy((((double*)(buffersPtr[i]))),(((double*)((*buffersPtrNext)[i]))),batchSize*sizeof(double)); + break; + case parquet::Type::BYTE_ARRAY: + memcpy((((parquet::ByteArray*)(buffersPtr[i]))),(((parquet::ByteArray*)((*buffersPtrNext)[i]))),batchSize*sizeof(parquet::ByteArray)); + break; + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + memcpy((((parquet::FixedLenByteArray*)(buffersPtr[i]))),(((parquet::FixedLenByteArray*)((*buffersPtrNext)[i]))),batchSize*sizeof(parquet::FixedLenByteArray)); + break; + default: + ARROW_LOG(WARNING) << "Unsupported Type!"; + continue; + + } + } + //int rowsRet = doFilter(rowsToReadNext, buffersPtr,nullsPtr); + + preBufferRowGroups(); + initRowGroupReaders(); + if (totalRowsRead >= totalRows && dumpAggCursor == 0) { + int rowsRet = doFilter(rowsToReadNext, buffersPtr,nullsPtr); + std::cout<<"not the first batch but read finished \n"; + std::cout<<"________________totalRowsRead = "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; +// nullsPtrNew_[i] = (int64_t) new bool[batchSize]; +// switch (fileMetaData->schema()->Column(requiredColumnIndex[i])->physical_type()) { +// case parquet::Type::BOOLEAN: +// (((buffersPtrNew_))[i]) = (int64_t) new bool[batchSize]; +// break; +// case parquet::Type::INT32: +// (((buffersPtrNew_))[i]) = (int64_t) new int32_t[batchSize]; +// break; +// case parquet::Type::INT64: +// (((buffersPtrNew_))[i]) = (int64_t) new int64_t[batchSize]; +// break; +// case parquet::Type::INT96: +// (((buffersPtrNew_))[i]) = (int64_t) new parquet::Int96[batchSize]; +// break; +// case parquet::Type::FLOAT: +// (((buffersPtrNew_))[i]) = (int64_t) new float[batchSize]; +// break; +// case parquet::Type::DOUBLE: +// (((buffersPtrNew_))[i]) = (int64_t) new double[batchSize]; +// break; +// case parquet::Type::BYTE_ARRAY: +// (((buffersPtrNew_))[i]) = (int64_t) new parquet::ByteArray[batchSize]; +// std::cout <<"type size = "<= totalRows && dumpAggCursor == 0) { +// return -1; +// } +// checkEndOfRowGroup(); +// std::cout<< "checkEndOfRowGroup has done\n"; +// buffersPtrNext=new std::vector(initRequiredColumnCount); +// nullsPtrNext=new std::vector(initRequiredColumnCount); +// std::vector buffersPtrReal(initRequiredColumnCount); +// std::vector nullsPtrReal(initRequiredColumnCount); +// for (int i = 0; i < usedInitBufferIndex.size(); i++) { +// (*buffersPtrNext)[i] = buffersPtrNew_[usedInitBufferIndex[i]]; +// buffersPtrReal[i] = buffersPtr_[usedInitBufferIndex[i]]; +// (*nullsPtrNext)[i] = nullsPtrNew_[usedInitBufferIndex[i]]; +// nullsPtrReal[i] = nullsPtr_[usedInitBufferIndex[i]]; +// } +// std::cout<schema()->Column(requiredColumnIndex[i])->physical_type()) { + +// case parquet::Type::BOOLEAN: +// memcpy((((bool*)(buffersPtrReal[i]))),(((bool*)((*buffersPtrNext)[i]))),batchSize*sizeof(bool)); +// break; +// case parquet::Type::INT32: +// memcpy((((int32_t*)(buffersPtrReal[i]))),(((int32_t*)((*buffersPtrNext)[i]))),batchSize*sizeof(int32_t)); +// break; +// case parquet::Type::INT64: +// memcpy((((int64_t*)(buffersPtrReal[i]))),(((int64_t*)((*buffersPtrNext)[i]))),batchSize*sizeof(int64_t)); +// break; +// case parquet::Type::INT96: +// memcpy((((parquet::Int96*)(buffersPtrReal[i]))),(((parquet::Int96*)((*buffersPtrNext)[i]))),batchSize*sizeof(parquet::Int96)); +// break; +// case parquet::Type::FLOAT: +// memcpy((((float*)(buffersPtrReal[i]))),(((float*)((*buffersPtrNext)[i]))),batchSize*sizeof(float)); +// break; +// case parquet::Type::DOUBLE: +// memcpy((((double*)(buffersPtrReal[i]))),(((double*)((*buffersPtrNext)[i]))),batchSize*sizeof(double)); +// break; +// case parquet::Type::BYTE_ARRAY: +// memcpy((((parquet::ByteArray*)(buffersPtrReal[i]))),(((parquet::ByteArray*)((*buffersPtrNext)[i]))),batchSize*sizeof(parquet::ByteArray)); +// break; +// case parquet::Type::FIXED_LEN_BYTE_ARRAY: +// memcpy((((parquet::FixedLenByteArray*)(buffersPtrReal[i]))),(((parquet::FixedLenByteArray*)((*buffersPtrNext)[i]))),batchSize*sizeof(parquet::FixedLenByteArray)); +// break; +// default: +// ARROW_LOG(WARNING) << "Unsupported Type!"; +// continue; + +// } +// } +// std::cout<< "before copy has done\n"; +// //COPY THE BUFFER TO THE GIVEN BUFFER WHICH WILL BE RETURN TO THE JAVA PART +// for (int i = 0; i < initRequiredColumnCount; i++) { +// std::cout <<"type: "<schema()->Column(requiredColumnIndex[i])->physical_type()<<"\n"; +// delete((bool*)nullsPtrNew_[i]); +// switch (fileMetaData->schema()->Column(requiredColumnIndex[i])->physical_type()) { + +// case parquet::Type::BOOLEAN: +// delete((bool*)buffersPtrNew_[i]); +// break; +// case parquet::Type::INT32: +// delete((int32_t*)buffersPtrNew_[i]); +// break; +// case parquet::Type::INT64: +// delete((int64_t*)buffersPtrNew_[i]); +// break; +// case parquet::Type::INT96: +// delete((parquet::Int96*)buffersPtrNew_[i]); +// break; +// case parquet::Type::FLOAT: +// delete((float*)buffersPtrNew_[i]); +// break; +// case parquet::Type::DOUBLE: +// delete((double*)buffersPtrNew_[i]); +// break; +// case parquet::Type::BYTE_ARRAY: +// delete((parquet::ByteArray*)buffersPtrNew_[i]); +// break; +// case parquet::Type::FIXED_LEN_BYTE_ARRAY: +// delete((parquet::FixedLenByteArray*)buffersPtrNew_[i]); +// break; +// default: +// ARROW_LOG(WARNING) << "Unsupported Type!"; +// continue; +// } +// } +// delete(buffersPtrNew_); +// delete(nullsPtrNew_); +// std::cout<< " copy has done, copy size = "<& buffersPtr, @@ -253,12 +591,15 @@ int Reader::doReadBatch(int batchSize, std::vector& buffersPtr, std::vector repLevel(rowsToRead); std::vector nullBitMap(rowsToRead); ARROW_LOG(DEBUG) << "will read " << rowsToRead << " rows"; + std::cout<< "____349_____ \n"; for (int i = 0; i < columnReaders.size(); i++) { int64_t levelsRead = 0, valuesRead = 0, nullCount = 0; int rows = 0; int tmpRows = 0; + std::cout<< "____354_____ \n"; // ReadBatchSpaced API will return rows left in a data page while (rows < rowsToRead) { + std::cout<< "____357_____ \n"; // TODO: refactor. it's ugly, but didn't find some better way. switch (typeVector[i]) { case parquet::Type::BOOLEAN: { @@ -358,7 +699,9 @@ int Reader::doReadBatch(int batchSize, std::vector& buffersPtr, ARROW_LOG(WARNING) << "Unsupported Type!"; break; } + std::cout<< "____457_____ \n"; convertBitMap(nullBitMap.data(), (uint8_t*)nullsPtr[i] + rows, tmpRows); + std::cout<< "____459_____ \n"; rows += tmpRows; } assert(rowsToRead == rows); @@ -375,8 +718,10 @@ int Reader::doFilter(int batchSize, std::vector& buffersPtr, int rowsRet = filterExpression->ExecuteWithParam(batchSize, buffersPtr, nullsPtr, tmp); filterTime += std::chrono::steady_clock::now() - start; + multiThreadRowsRet=rowsRet; return rowsRet; } + multiThreadRowsRet=batchSize; return batchSize; } @@ -450,15 +795,53 @@ int Reader::dumpBufferAfterAgg(int groupBySize, int aggExprsSize, return 0; } +int Reader::allocateExtraBuffersOrigin(int batchSize, std::vector& buffersPtr, + std::vector& nullsPtr) { + if (filterExpression) { + allocateFilterBuffers(batchSize); + } + + if (aggExprs.size()) { // todo: group by agg size + allocateAggBuffers(batchSize); + } + + int filterBufferCount = filterDataBuffers.size(); + int aggBufferCount = aggDataBuffers.size(); + + if (filterBufferCount > 0 || aggBufferCount > 0) { + ARROW_LOG(DEBUG) << "use extra filter buffers count: " << filterBufferCount + << "use extra agg buffers count: " << aggBufferCount; + + buffersPtr.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); + nullsPtr.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); + + for (int i = 0; i < filterBufferCount; i++) { + buffersPtr[initRequiredColumnCount + i] = (int64_t)filterDataBuffers[i]; + nullsPtr[initRequiredColumnCount + i] = (int64_t)filterNullBuffers[i]; + } + for (int i = 0; i < aggBufferCount; i++) { + buffersPtr[initRequiredColumnCount + filterBufferCount + i] = + (int64_t)aggDataBuffers[i]; + nullsPtr[initRequiredColumnCount + filterBufferCount + i] = + (int64_t)aggNullBuffers[i]; + } + } + return initRequiredColumnCount + filterBufferCount + aggBufferCount; +} int Reader::allocateExtraBuffers(int batchSize, std::vector& buffersPtr, - std::vector& nullsPtr) { + std::vector& nullsPtr, std::vector& buffersPtrReal, + std::vector& nullsPtrReal) { if (filterExpression) { + std::cout<< "before allocateFilterBuffers has done\n"; allocateFilterBuffers(batchSize); + std::cout<< "allocateFilterBuffers has done\n"; } if (aggExprs.size()) { // todo: group by agg size + std::cout<< "before allocateAggBuffers has done\n"; allocateAggBuffers(batchSize); + std::cout<< "allocateAggBuffers has done\n"; } int filterBufferCount = filterDataBuffers.size(); @@ -470,10 +853,14 @@ int Reader::allocateExtraBuffers(int batchSize, std::vector& buffersPtr buffersPtr.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); nullsPtr.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); + buffersPtrReal.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); + nullsPtrReal.resize(initRequiredColumnCount + filterBufferCount + aggBufferCount); for (int i = 0; i < filterBufferCount; i++) { buffersPtr[initRequiredColumnCount + i] = (int64_t)filterDataBuffers[i]; nullsPtr[initRequiredColumnCount + i] = (int64_t)filterNullBuffers[i]; + buffersPtrReal[initRequiredColumnCount + i] = (int64_t)filterDataBuffers[i]; + nullsPtrReal[initRequiredColumnCount + i] = (int64_t)filterNullBuffers[i]; } for (int i = 0; i < aggBufferCount; i++) { @@ -481,11 +868,15 @@ int Reader::allocateExtraBuffers(int batchSize, std::vector& buffersPtr (int64_t)aggDataBuffers[i]; nullsPtr[initRequiredColumnCount + filterBufferCount + i] = (int64_t)aggNullBuffers[i]; + buffersPtrReal[initRequiredColumnCount + filterBufferCount + i] =(int64_t)aggDataBuffers[i]; + nullsPtrReal[initRequiredColumnCount + filterBufferCount + i] =(int64_t)aggNullBuffers[i]; } } return initRequiredColumnCount + filterBufferCount + aggBufferCount; } + + bool Reader::hasNext() { return dumpAggCursor > 0 || columnReaders[0]->HasNext(); } bool Reader::skipNextRowGroup() { @@ -627,57 +1018,74 @@ void Reader::setFilter(std::string filterJsonStr) { } int Reader::allocateFilterBuffers(int batchSize) { + std::cout<< "751\n"; if (!filterReset && batchSize <= currentBatchSize) { + std::cout<< "753\n"; return 0; } filterReset = false; - +std::cout<< "757\n"; // free current filter buffers freeFilterBuffers(); - +std::cout<< "760\n"; // allocate new filter buffers int extraBufferNum = 0; for (int i = initRequiredColumnCount; i < initPlusFilterRequiredColumnCount; i++) { + std::cout<< "764\n"; int columnIndex = requiredColumnIndex[i]; // allocate memory buffer char* dataBuffer; switch (fileMetaData->schema()->Column(columnIndex)->physical_type()) { case parquet::Type::BOOLEAN: dataBuffer = (char*)new bool[batchSize]; + std::cout<< "770\n"; break; case parquet::Type::INT32: dataBuffer = (char*)new int32_t[batchSize]; + std::cout<< "775\n"; break; case parquet::Type::INT64: dataBuffer = (char*)new int64_t[batchSize]; + std::cout<< "779\n"; break; case parquet::Type::INT96: dataBuffer = (char*)new parquet::Int96[batchSize]; + std::cout<< "783\n"; break; case parquet::Type::FLOAT: dataBuffer = (char*)new float[batchSize]; + std::cout<< "787\n"; break; case parquet::Type::DOUBLE: dataBuffer = (char*)new double[batchSize]; + std::cout<< "791\n"; break; case parquet::Type::BYTE_ARRAY: dataBuffer = (char*)new parquet::ByteArray[batchSize]; + std::cout<< "795\n"; break; case parquet::Type::FIXED_LEN_BYTE_ARRAY: dataBuffer = (char*)new parquet::FixedLenByteArray[batchSize]; + std::cout<< "799\n"; break; default: ARROW_LOG(WARNING) << "Unsupported Type!"; + std::cout<< "803\n"; continue; } char* nullBuffer = new char[batchSize]; + std::cout<< "808\n"; filterDataBuffers.push_back(dataBuffer); + std::cout<< "810\n"; filterNullBuffers.push_back(nullBuffer); + std::cout<< "812\n"; extraBufferNum++; + std::cout<< "815\n"; } ARROW_LOG(INFO) << "create extra filter buffers count: " << extraBufferNum; + std::cout<< "819\n"; return extraBufferNum; } diff --git a/oap-ape/ape-native/src/reader.h b/oap-ape/ape-native/src/reader.h index 17930ac86..460393c4b 100644 --- a/oap-ape/ape-native/src/reader.h +++ b/oap-ape/ape-native/src/reader.h @@ -67,6 +67,7 @@ class Reader { void setPreBufferEnabled(bool isEnabled); static bool isNativeEnabled(); + private: void convertSchema(std::string requiredColumnName); @@ -94,8 +95,10 @@ class Reader { std::vector& nullsPtr); int allocateExtraBuffers(int batchSize, std::vector& buffersPtr, - std::vector& nullsPtr); - + std::vector& nullsPtr, std::vector& buffersPtrReal, + std::vector& nullsPtrReal) ; + int allocateExtraBuffersOrigin(int batchSize, std::vector& buffersPtr, + std::vector& nullsPtr); int dumpBufferAfterAgg(int groupBySize, int aggExprsSize, const std::vector& keys, const std::vector& results, int64_t* oriBufferPtr, int64_t* oriNullsPtr, int32_t offset, int32_t length); @@ -138,7 +141,8 @@ class Reader { std::vector filterColumnNames; std::vector filterDataBuffers; std::vector filterNullBuffers; - + std::vector filterDataBuffersReal; + std::vector filterNullBuffersReal; int initPlusFilterRequiredColumnCount = 0; bool aggReset = false; std::vector aggColumnNames; @@ -164,5 +168,12 @@ class Reader { ApeHashMap map; int32_t dumpAggCursor = 0; + int64_t * buffersPtrNew_; + int64_t * nullsPtrNew_; + std::vector *buffersPtrNext; + std::vector *nullsPtrNext; + int rowsToReadNext=0; + int haveRead = 1; + int multiThreadRowsRet =0; }; } // namespace ape