From 477d068a6e962f8a76a7bce5adb6767e14ec4bbd Mon Sep 17 00:00:00 2001 From: zhaoyaqi Date: Tue, 22 Nov 2022 01:06:58 -0500 Subject: [PATCH] =?UTF-8?q?Add=20Intel=C2=AE-IAA/QPL-based=20Parquet=20RLE?= =?UTF-8?q?=20Decode,=20qpl=20is=20thirdparty=20tool=20chain?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cpp/CMakeLists.txt | 13 + cpp/cmake_modules/DefineOptions.cmake | 2 + cpp/cmake_modules/ThirdpartyToolchain.cmake | 52 +++- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/util/bit_stream_utils.h | 9 + cpp/src/arrow/util/qpl_job_pool.cc | 120 +++++++++ cpp/src/arrow/util/qpl_job_pool.h | 70 ++++++ cpp/src/arrow/util/rle_encoding.h | 89 +++++++ cpp/src/parquet/CMakeLists.txt | 8 +- cpp/src/parquet/arrow/parquet_reader_test.cc | 205 +++++++++++++++ cpp/src/parquet/arrow/qpl_reader_benchmark.cc | 237 ++++++++++++++++++ cpp/src/parquet/encoding.cc | 5 + cpp/src/parquet/encoding.h | 12 +- cpp/submodules/qpl | 1 + 14 files changed, 821 insertions(+), 3 deletions(-) create mode 100644 cpp/src/arrow/util/qpl_job_pool.cc create mode 100644 cpp/src/arrow/util/qpl_job_pool.h create mode 100644 cpp/src/parquet/arrow/parquet_reader_test.cc create mode 100644 cpp/src/parquet/arrow/qpl_reader_benchmark.cc create mode 160000 cpp/submodules/qpl diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index e6d0ed4e81f0c..945b2dbd11d83 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -347,6 +347,15 @@ if(UNIX) add_custom_target(iwyu-all ${BUILD_SUPPORT_DIR}/iwyu/iwyu.sh all) endif(UNIX) +if(ARROW_WITH_QPL) + add_definitions(-DENABLE_QPL_ANALYSIS) + list(APPEND ARROW_STATIC_LINK_LIBS qpl::qpl) + if(QPL_SOURCE STREQUAL "SYSTEM") + list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS qpl::qpl) + endif() + message("ENABLE_QPL_ANALYSIS" ${ENABLE_QPL_ANALYSIS}) +endif() + # datetime code used by iOS requires zlib support if(IOS) set(ARROW_WITH_ZLIB ON) @@ -645,6 +654,10 @@ set(ARROW_SHARED_INSTALL_INTERFACE_LIBS) set(ARROW_STATIC_LINK_LIBS arrow::flatbuffers arrow::hadoop) set(ARROW_STATIC_INSTALL_INTERFACE_LIBS) +if(ARROW_WITH_QPL) + list(APPEND ARROW_SHARED_LINK_LIBS qpl::qpl) +endif() + if(ARROW_USE_BOOST) list(APPEND ARROW_SHARED_LINK_LIBS Boost::headers) list(APPEND ARROW_STATIC_LINK_LIBS Boost::headers) diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index 040a6f582961d..7b399a5a39b47 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -375,6 +375,8 @@ takes precedence over ccache if a storage backend is configured" ON) define_option(ARROW_PLASMA "Build the plasma object store along with Arrow" OFF) + define_option(ARROW_WITH_QPL "Enable IntelĀ® Query Processing Library" ON) + define_option(ARROW_PYTHON "Build some components needed by PyArrow.;\ (This is a deprecated option. Use CMake presets instead.)" diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index 364e631ceaba0..389b8274b4777 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -72,7 +72,8 @@ set(ARROW_THIRDPARTY_DEPENDENCIES utf8proc xsimd ZLIB - zstd) + zstd + QPL) # For backward compatibility. We use "BOOST_SOURCE" if "Boost_SOURCE" # isn't specified and "BOOST_SOURCE" is specified. @@ -203,6 +204,8 @@ macro(build_dependency DEPENDENCY_NAME) build_zlib() elseif("${DEPENDENCY_NAME}" STREQUAL "zstd") build_zstd() + elseif("${DEPENDENCY_NAME}" STREQUAL "QPL") + build_qpl() else() message(FATAL_ERROR "Unknown thirdparty dependency to build: ${DEPENDENCY_NAME}") endif() @@ -2203,6 +2206,53 @@ if(ARROW_WITH_RAPIDJSON) endif() endif() + +macro(build_qpl) + message(STATUS "Building QPL from source") + set(QPL_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/qpl_ep/src/qpl_ep-install") + if(MSVC) + set(QPL_STATIC_LIB_NAME qplstatic.lib) + else() + set(QPL_STATIC_LIB_NAME libqpl.a) + endif() + set(QPL_STATIC_LIB "${QPL_PREFIX}/lib64/${QPL_STATIC_LIB_NAME}") + set(QPL_CMAKE_ARGS + ${EP_COMMON_CMAKE_ARGS} + -DCMAKE_BUILD_TYPE=Release + "-DCMAKE_INSTALL_PREFIX=${QPL_PREFIX}" + EXCLUDE_FROM_ALL NOT) + + + ExternalProject_Add(qpl_ep + ${EP_LOG_OPTIONS} + GIT_REPOSITORY https://github.com/intel/qpl.git + GIT_TAG v0.2.1 + GIT_SUBMODULES_RECURSE TRUE + BUILD_BYPRODUCTS "${QPL_STATIC_LIB}" + CMAKE_ARGS ${QPL_CMAKE_ARGS} + ) + + + file(MAKE_DIRECTORY "${QPL_PREFIX}/include") + + add_library(qpl::qpl STATIC IMPORTED) + set(QPL_LIBRARIES ${QPL_STATIC_LIB}) + set(QPL_INCLUDE_DIRS "${QPL_PREFIX}/include") + set_target_properties(qpl::qpl + PROPERTIES IMPORTED_LOCATION ${QPL_LIBRARIES} + INTERFACE_INCLUDE_DIRECTORIES ${QPL_INCLUDE_DIRS}) + + add_dependencies(toolchain qpl_ep) + add_dependencies(qpl::qpl qpl_ep) + + list(APPEND ARROW_BUNDLED_STATIC_LIBS qpl::qpl) + set(QPL_VENDORED TRUE) +endmacro() + +if(ARROW_WITH_QPL) + resolve_dependency(QPL PC_PACKAGE_NAMES qpl) +endif() + macro(build_xsimd) message(STATUS "Building xsimd from source") set(XSIMD_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/xsimd_ep/src/xsimd_ep-install") diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 099a862376041..18ba55ce0b256 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -228,6 +228,7 @@ set(ARROW_SRCS util/uri.cc util/utf8.cc util/value_parsing.cc + util/qpl_job_pool.cc vendored/base64.cpp vendored/datetime/tz.cpp vendored/double-conversion/bignum.cc diff --git a/cpp/src/arrow/util/bit_stream_utils.h b/cpp/src/arrow/util/bit_stream_utils.h index 2f70c286503fb..ef97db21bbd2e 100644 --- a/cpp/src/arrow/util/bit_stream_utils.h +++ b/cpp/src/arrow/util/bit_stream_utils.h @@ -190,6 +190,15 @@ class BitReader { /// Maximum byte length of a vlq encoded int64 static constexpr int kMaxVlqByteLengthForInt64 = 10; + const uint8_t * getBuffer() { + return buffer_ - 1; + } + + int getBufferLen() { + return max_bytes_ + 1; + } + + private: const uint8_t* buffer_; int max_bytes_; diff --git a/cpp/src/arrow/util/qpl_job_pool.cc b/cpp/src/arrow/util/qpl_job_pool.cc new file mode 100644 index 0000000000000..23e7dcbf2f1ee --- /dev/null +++ b/cpp/src/arrow/util/qpl_job_pool.cc @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#ifdef ENABLE_QPL_ANALYSIS +#include "arrow/util/qpl_job_pool.h" + +namespace arrow { +namespace internal { +namespace detail { + +std::array QplJobHWPool::hw_job_ptr_pool; +std::array QplJobHWPool::job_ptr_locks; +bool QplJobHWPool::job_pool_ready = false; +std::unique_ptr QplJobHWPool::hw_jobs_buffer; + +QplJobHWPool & QplJobHWPool::instance() { + static QplJobHWPool pool; + return pool; +} + +QplJobHWPool::QplJobHWPool() + : random_engine(std::random_device()()) + , distribution(0, MAX_JOB_NUMBER - 1) { + (void)initQPLJob(); +} + +QplJobHWPool::~QplJobHWPool() { + for (uint32_t i = 0; i < MAX_JOB_NUMBER; ++i) { + if (hw_job_ptr_pool[i]) { + qpl_fini_job(hw_job_ptr_pool[i]); + hw_job_ptr_pool[i] = nullptr; + } + } + job_pool_ready = false; +} + +arrow::Status QplJobHWPool::initQPLJob() { + uint32_t job_size = 0; + + /// Get size required for saving a single qpl job object + qpl_get_job_size(qpl_path_hardware, &job_size); + /// Allocate entire buffer for storing all job objects + hw_jobs_buffer = std::make_unique(job_size * MAX_JOB_NUMBER); + /// Initialize pool for storing all job object pointers + /// Reallocate buffer by shifting address offset for each job object. + for (uint32_t index = 0; index < MAX_JOB_NUMBER; ++index) { + qpl_job * qpl_job_ptr = reinterpret_cast(hw_jobs_buffer.get() + index * job_size); + if (qpl_init_job(qpl_path_hardware, qpl_job_ptr) != QPL_STS_OK) { + job_pool_ready = false; + return arrow::Status::Invalid("Initialization of hardware IAA failed. Please check if Intel In-Memory Analytics Accelerator (IAA) is properly set up!"); + } + hw_job_ptr_pool[index] = qpl_job_ptr; + unLockJob(index); + } + + job_pool_ready = true; + return arrow::Status::OK(); +} + +qpl_job * QplJobHWPool::acquireJob(uint32_t & job_id) { + if (!isJobPoolReady() && !initQPLJob().ok()) { + return nullptr; + } + + uint32_t retry = 0; + auto index = distribution(random_engine); + while (!tryLockJob(index)) { + index = distribution(random_engine); + retry++; + if (retry > MAX_JOB_NUMBER) { + return nullptr; + } + } + job_id = MAX_JOB_NUMBER - index; + if (index >= MAX_JOB_NUMBER) { + return nullptr; + } + return hw_job_ptr_pool[index]; +} + +void QplJobHWPool::releaseJob(uint32_t job_id) { + if (isJobPoolReady()) { + unLockJob(MAX_JOB_NUMBER - job_id); + } +} + +bool QplJobHWPool::tryLockJob(uint32_t index) { + bool expected = false; + if (index >= MAX_JOB_NUMBER) { + return false; + } + return job_ptr_locks[index].compare_exchange_strong(expected, true); +} + +void QplJobHWPool::unLockJob(uint32_t index) { + if (index >= MAX_JOB_NUMBER) { + return; + } + job_ptr_locks[index].store(false); +} +} // namespace detail +} // namespace internal +} // namespace arrow +#endif diff --git a/cpp/src/arrow/util/qpl_job_pool.h b/cpp/src/arrow/util/qpl_job_pool.h new file mode 100644 index 0000000000000..4e8a8fe3ace98 --- /dev/null +++ b/cpp/src/arrow/util/qpl_job_pool.h @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#ifdef ENABLE_QPL_ANALYSIS +#include +#include + + +namespace arrow { +namespace internal { +namespace detail { + +/// QplJobHWPool is resource pool to provide the job objects. +/// Job object is used for storing context information during +// offloading compression job to HW Accelerator. +class QplJobHWPool { + public: + QplJobHWPool(); + ~QplJobHWPool(); + + static QplJobHWPool & instance(); + + qpl_job * acquireJob(uint32_t & job_id); + static void releaseJob(uint32_t job_id); + static const bool & isJobPoolReady() { return job_pool_ready; } + + private: + static bool tryLockJob(uint32_t index); + static void unLockJob(uint32_t index); + static arrow::Status initQPLJob(); + + /// Maximum jobs running in parallel supported by IAA hardware + static constexpr auto MAX_JOB_NUMBER = 512; + /// Entire buffer for storing all job objects + static std::unique_ptr hw_jobs_buffer; + /// Job pool for storing all job object pointers + static std::array hw_job_ptr_pool; + /// Locks for accessing each job object pointers + static std::array job_ptr_locks; + static bool job_pool_ready; + std::mt19937 random_engine; + std::uniform_int_distribution distribution; +}; +} // namespace detail +} // namespace internal +} // namespace arrow +#endif diff --git a/cpp/src/arrow/util/rle_encoding.h b/cpp/src/arrow/util/rle_encoding.h index cc90f658f0c7c..84e2f60ec4dff 100644 --- a/cpp/src/arrow/util/rle_encoding.h +++ b/cpp/src/arrow/util/rle_encoding.h @@ -31,6 +31,12 @@ #include "arrow/util/bit_util.h" #include "arrow/util/macros.h" +#ifdef ENABLE_QPL_ANALYSIS +#include +#include +#include "arrow/util/qpl_job_pool.h" +#endif + namespace arrow { namespace util { @@ -129,6 +135,12 @@ class RleDecoder { int GetBatchWithDict(const T* dictionary, int32_t dictionary_length, T* values, int batch_size); +#ifdef ENABLE_QPL_ANALYSIS + template + int GetBatchWithDictIAA(const T* dictionary, int32_t dictionary_length, T* values, + int batch_size); +#endif + /// Like GetBatchWithDict but add spacing for null entries /// /// Null entries will be zero-initialized in `values` to avoid leaking @@ -598,6 +610,83 @@ inline int RleDecoder::GetBatchWithDict(const T* dictionary, int32_t dictionary_ return values_read; } + +#ifdef ENABLE_QPL_ANALYSIS +template +inline int RleDecoder::GetBatchWithDictIAA(const T* dictionary, int32_t dictionary_length, + T* values, int batch_size) { + if (bit_width_ == 1) { + return GetBatchWithDict(dictionary, dictionary_length, values, batch_size); + } + uint32_t job_id = 0; + qpl_job* job = arrow::internal::detail::QplJobHWPool::instance().acquireJob(job_id); + if (job == nullptr) { + return -1; + } + + std::vector* destination; + if (dictionary_length < 0xFF) { + job->out_bit_width = qpl_ow_8; + destination = new std::vector(batch_size, 0); + } else if (dictionary_length < 0xFFFF) { + job->out_bit_width = qpl_ow_16; + destination = new std::vector(batch_size * 2, 0); + } else { + job->out_bit_width = qpl_ow_32; + destination = new std::vector(batch_size * 4, 0); + } + auto qpl_out = destination; + + job->op = qpl_op_extract; + job->src1_bit_width = bit_width_; + job->param_low = 0; + job->param_high = batch_size; + job->num_input_elements = batch_size; + job->parser = qpl_p_parquet_rle; + + job->next_in_ptr = const_cast(bit_reader_.getBuffer()); + job->available_in = bit_reader_.getBufferLen(); + job->next_out_ptr = qpl_out->data(); + job->available_out = static_cast(qpl_out->size()); + + + qpl_status status = qpl_execute_job(job); + if (status != QPL_STS_OK) { + return -1; + } + + auto* out = values; + if (destination->size() / batch_size == qpl_ow_32) { + auto *indices = reinterpret_cast(destination->data()); + for (int j = 0; j < batch_size; j++) { + uint32_t idx = static_cast(indices[j]); + T val = dictionary[idx]; + std::fill(out, out+1, val); + out++; + } + } else if (destination->size() / batch_size == qpl_ow_16) { + auto *indices = reinterpret_cast(destination->data()); + for (int j = 0; j < batch_size; j++) { + uint16_t idx = static_cast(indices[j]); + T val = dictionary[idx]; + std::fill(out, out+1, val); + out++; + } + } else { + auto *indices = reinterpret_cast(destination->data()); + for (int j = 0; j < batch_size; j++) { + uint8_t idx = static_cast(indices[j]); + T val = dictionary[idx]; + std::fill(out, out+1, val); + out++; + } + } + + arrow::internal::detail::QplJobHWPool::instance().releaseJob(job_id); + return batch_size; +} +#endif + template inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary, int32_t dictionary_length, T* out, diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index dc55ab158d844..73b92f55ff679 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -127,7 +127,6 @@ set(PARQUET_SHARED_TEST_LINK_LIBS arrow_testing_shared ${PARQUET_MIN_TEST_LIBS} set(PARQUET_STATIC_TEST_LINK_LIBS ${PARQUET_MIN_TEST_LIBS} parquet_static thrift::thrift ${ARROW_LIBRARIES_FOR_STATIC_TESTS}) - # # Generated Thrift sources set_source_files_properties(src/generated/parquet_types.cpp src/generated/parquet_types.h @@ -168,6 +167,7 @@ set(PARQUET_SRCS murmur3.cc "${ARROW_SOURCE_DIR}/src/generated/parquet_constants.cpp" "${ARROW_SOURCE_DIR}/src/generated/parquet_types.cpp" + "${ARROW_SOURCE_DIR}/src/arrow/util/qpl_job_pool.cc" platform.cc printer.cc properties.cc @@ -355,6 +355,11 @@ add_parquet_test(arrow-test arrow/arrow_statistics_test.cc test_util.cc) +# add_parquet_test(reader-qpl-test +# SOURCES +# arrow/parquet_reader_test.cc +# test_util.cc) + add_parquet_test(arrow-internals-test SOURCES arrow/path_internal_test.cc @@ -389,6 +394,7 @@ add_parquet_benchmark(column_io_benchmark) add_parquet_benchmark(encoding_benchmark) add_parquet_benchmark(level_conversion_benchmark) add_parquet_benchmark(arrow/reader_writer_benchmark PREFIX "parquet-arrow") +add_parquet_benchmark(arrow/qpl_reader_benchmark) if(ARROW_WITH_BROTLI) add_definitions(-DARROW_WITH_BROTLI) diff --git a/cpp/src/parquet/arrow/parquet_reader_test.cc b/cpp/src/parquet/arrow/parquet_reader_test.cc new file mode 100644 index 0000000000000..24a3320ee9034 --- /dev/null +++ b/cpp/src/parquet/arrow/parquet_reader_test.cc @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifdef _MSC_VER +#pragma warning(push) +// Disable forcing value to bool warnings +#pragma warning(disable : 4800) +#endif + +#include "gtest/gtest.h" +#include + +#include "arrow/io/api.h" +#include "arrow/table.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/util.h" +#include "arrow/util/config.h" // for ARROW_CSV definition +#include "parquet/arrow/reader.h" +#include "parquet/arrow/test_util.h" +#include "parquet/test_util.h" +#include "arrow/util/qpl_job_pool.h" + +#ifdef ARROW_CSV +#include "arrow/csv/api.h" +#endif + +using arrow::Status; +using arrow::Table; +using arrow::DataType; + +// namespace parquet { +namespace arrow { + +static size_t countIndicesForType(std::shared_ptr type) { + if (type->id() == arrow::Type::LIST) { + return countIndicesForType( + static_cast(type.get())->value_type()); + } + + if (type->id() == arrow::Type::STRUCT) { + int indices = 0; + auto * struct_type = static_cast(type.get()); + for (int i = 0; i != struct_type->num_fields(); ++i) + indices += countIndicesForType(struct_type->field(i)->type()); + return indices; + } + + if (type->id() == arrow::Type::MAP) { + auto * map_type = static_cast(type.get()); + return countIndicesForType(map_type->key_type()) + + countIndicesForType(map_type->item_type()); + } + + return 1; +} + +static void getFileReaderAndSchema( + const std::string& file_name, + std::unique_ptr & file_reader, + std::shared_ptr & schema) { + auto file = parquet::test::get_data_file(file_name); + std::shared_ptr infile; + PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open( + file, arrow::default_memory_pool())); + ASSERT_OK(parquet::arrow::OpenFile(std::move(infile), + arrow::default_memory_pool(), &file_reader)); + ASSERT_OK(file_reader->GetSchema(&schema)); +} + +class ParquetRowGroupReader : public ::testing::Test { + public: + ParquetRowGroupReader() {} + + void read(const std::string & filename) { + if (!file_reader) { + prepareReader(filename); + } + + size_t parallel = 2; + while (row_group_current < row_group_total) { + std::vector row_group_indexes; + for (; row_group_current < row_group_total && + row_group_indexes.size() < parallel; ++row_group_current) { + row_group_indexes.push_back(row_group_current); + } + + if (row_group_indexes.empty()) { + return; + } + std::shared_ptr table; + arrow::Status read_status = file_reader->ReadRowGroups( + row_group_indexes, column_indices, &table); + ASSERT_OK(read_status); + } + return; + } + + + void prepareReader(const std::string & filename) { + std::shared_ptr schema; + getFileReaderAndSchema(filename, file_reader, schema); + + row_group_total = file_reader->num_row_groups(); + row_group_current = 0; + + int index = 0; + for (int i = 0; i < schema->num_fields(); ++i) { + /// STRUCT type require the number of indexes equal to the number of + /// nested elements, so we should recursively + /// count the number of indices we need for this type. + int indexes_count = countIndicesForType(schema->field(i)->type()); + + for (int j = 0; j != indexes_count; ++j) + column_indices.push_back(index + j); + index += indexes_count; + } + } + + std::unique_ptr file_reader; + int row_group_total = 0; + int row_group_current = 0; + // indices of columns to read from Parquet file + std::vector column_indices; +}; + +TEST_F(ParquetRowGroupReader, ReadParquetFile) { + std::shared_ptr<::arrow::Table> actual_table; + read("lineorder.parquet"); +} + +} // namespace arrow + +#ifdef ARROW_CSV + +namespace parquet { +namespace arrow { + +class TestArrowReadWithQPL : public ::testing::Test { + public: + void ReadTableFromParquetFile(const std::string& file_name, + std::shared_ptr* out) { + auto file = test::get_data_file(file_name); + auto pool = ::arrow::default_memory_pool(); + std::unique_ptr parquet_reader; + ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), + &parquet_reader)); + ASSERT_OK(parquet_reader->ReadTable(out)); + ASSERT_OK((*out)->ValidateFull()); + } + + void ReadTableFromCSVFile(const std::string& file_name, + const ::arrow::csv::ConvertOptions& convert_options, + std::shared_ptr
* out) { + auto file = test::get_data_file(file_name); + ASSERT_OK_AND_ASSIGN(auto input_file, ::arrow::io::ReadableFile::Open(file)); + ASSERT_OK_AND_ASSIGN(auto csv_reader, + ::arrow::csv::TableReader::Make( + ::arrow::io::default_io_context(), input_file, + ::arrow::csv::ReadOptions::Defaults(), + ::arrow::csv::ParseOptions::Defaults(), convert_options)); + ASSERT_OK_AND_ASSIGN(*out, csv_reader->Read()); + } +}; + +TEST_F(TestArrowReadWithQPL, ReadSnappyParquetFile) { + std::shared_ptr<::arrow::Table> actual_table, expect_table; + ReadTableFromParquetFile("lineorder.parquet", &actual_table); + + auto convert_options = ::arrow::csv::ConvertOptions::Defaults(); + convert_options.column_types = {{"c_customer_sk", ::arrow::uint8()}, + {"c_current_cdemo_sk", ::arrow::uint8()}, + {"c_current_hdemo_sk", ::arrow::uint8()}, + {"c_current_addr_sk", ::arrow::uint8()}, + {"c_customer_id", ::arrow::binary()}}; + convert_options.strings_can_be_null = true; + ReadTableFromCSVFile("lineorder.csv", convert_options, + &expect_table); + + ::arrow::AssertTablesEqual(*actual_table, *expect_table); +} + +#else +TEST_F(TestArrowReadWithQPL, ReadSnappyParquetFile) { + GTEST_SKIP() << "Test needs CSV reader"; +} +#endif + +} // namespace arrow +} // namespace parquet + + diff --git a/cpp/src/parquet/arrow/qpl_reader_benchmark.cc b/cpp/src/parquet/arrow/qpl_reader_benchmark.cc new file mode 100644 index 0000000000000..40ed1822b95f0 --- /dev/null +++ b/cpp/src/parquet/arrow/qpl_reader_benchmark.cc @@ -0,0 +1,237 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include + +#include "arrow/io/api.h" +#include "arrow/table.h" +#include "arrow/testing/util.h" +#include "arrow/util/config.h" // for ARROW_CSV definition + +#ifdef ARROW_CSV +#include "arrow/csv/api.h" +#endif + +#include "parquet/arrow/reader.h" +#include "parquet/arrow/test_util.h" + +#include "arrow/io/memory.h" +#include "arrow/testing/gtest_util.h" + +using arrow::Status; +using arrow::Table; +using arrow::DataType; + +#define EXIT_NOT_OK(s) \ + do { \ + ::arrow::Status _s = (s); \ + if (ARROW_PREDICT_FALSE(!_s.ok())) { \ + std::cout << "Exiting: " << _s.ToString() << std::endl; \ + exit(EXIT_FAILURE); \ + } \ + } while (0) + + +namespace arrow { + +class ParquetTestException : public parquet::ParquetException { + using ParquetException::ParquetException; +}; + +const char* get_data_dir() { + const auto result = std::getenv("PARQUET_TEST_DATA"); + if (!result || !result[0]) { + throw ParquetTestException( + "Please point the PARQUET_TEST_DATA environment " + "variable to the test data directory"); + } + return result; +} + +std::string get_data_file(const std::string& filename) { + std::stringstream ss; + + ss << get_data_dir(); + ss << "/" << filename; + return ss.str(); +} + + +// This should result in multiple pages for most primitive types +constexpr int64_t BENCHMARK_SIZE = 10 * 1024 * 1024; + +static size_t countIndicesForType(std::shared_ptr type) { + if (type->id() == arrow::Type::LIST) { + return countIndicesForType( + static_cast(type.get())->value_type()); + } + + if (type->id() == arrow::Type::STRUCT) { + int indices = 0; + auto * struct_type = static_cast(type.get()); + for (int i = 0; i != struct_type->num_fields(); ++i) + indices += countIndicesForType(struct_type->field(i)->type()); + return indices; + } + + if (type->id() == arrow::Type::MAP) { + auto * map_type = static_cast(type.get()); + return countIndicesForType(map_type->key_type()) + + countIndicesForType(map_type->item_type()); + } + + return 1; +} + +static void getFileReaderAndSchema( + const std::string& file_name, + std::unique_ptr & file_reader, + std::shared_ptr & schema) { + auto file = get_data_file(file_name); + std::shared_ptr infile; + PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(file, + arrow::default_memory_pool())); + EXIT_NOT_OK(parquet::arrow::OpenFile(std::move(infile), + arrow::default_memory_pool(), &file_reader)); + EXIT_NOT_OK(file_reader->GetSchema(&schema)); +} + + +class ParquetRowGroupReader { + public: + ParquetRowGroupReader(){} + + void read(const std::string & filename) { + if (!file_reader) + prepareReader(filename); + + size_t parallel = 5; + while (row_group_current < row_group_total) { + std::vector row_group_indexes; + for (; row_group_current < row_group_total && + row_group_indexes.size() < parallel; ++row_group_current) { + row_group_indexes.push_back(row_group_current); + } + + if (row_group_indexes.empty()) { + return; + } + std::shared_ptr table; + arrow::Status read_status = file_reader->ReadRowGroups( + row_group_indexes, column_indices, &table); + ASSERT_OK(read_status); + } + return; + } + + + void prepareReader(const std::string & filename) { + std::shared_ptr schema; + getFileReaderAndSchema(filename, file_reader, schema); + + row_group_total = file_reader->num_row_groups(); + row_group_current = 0; + + int index = 0; + for (int i = 0; i < schema->num_fields(); ++i) { + /// STRUCT type require the number of indexes equal to the number of + /// nested elements, so we should recursively + /// count the number of indices we need for this type. + int indexes_count = countIndicesForType(schema->field(i)->type()); + + for (int j = 0; j != indexes_count; ++j) + column_indices.push_back(index + j); + index += indexes_count; + } + } + + std::unique_ptr file_reader; + int row_group_total = 0; + int row_group_current = 0; + // indices of columns to read from Parquet file + std::vector column_indices; +}; + + +template +void SetBytesProcessed(::benchmark::State& state, int64_t num_values = BENCHMARK_SIZE) { + const int64_t items_processed = state.iterations() * num_values; + const int64_t bytes_processed = items_processed * sizeof(rows); + + state.SetItemsProcessed(bytes_processed); + state.SetBytesProcessed(bytes_processed); +} + + +template +static void BM_ReadFile(::benchmark::State& state) { + while (state.KeepRunning()) { + ParquetRowGroupReader reader; + std::string file_name = "single_column_" + std::to_string(state.range(0)) + + "kw_" + std::to_string(state.range(1)) + ".parquet"; + reader.read(file_name); + } + + SetBytesProcessed(state); +} + + +template +static void BM_ReadFileDiffBitWidth(::benchmark::State& state) { + while (state.KeepRunning()) { + ParquetRowGroupReader reader; + std::string file_name = "sc_" + std::to_string(state.range(0)) + "kw_multibit_" + + std::to_string(state.range(1)) + ".parquet"; + reader.read(file_name); + } + + SetBytesProcessed(state); +} + +// There are two parameters here that cover different data distributions. +// null_percentage governs distribution and therefore runs of null values. +// first_value_percentage governs distribution of values (we select from 1 of 2) +// so when 0 or 100 RLE is triggered all the time. When a value in the range (0, 100) +// there will be some percentage of RLE encoded values and some percentage of literal +// encoded values (RLE is much less likely with percentages close to 50). +BENCHMARK_TEMPLATE2(BM_ReadFile, 1, 64) + ->Args({1, 64}) + ->Args({2, 64}) + ->Args({3, 64}) + ->Args({1, 512}) + ->Args({2, 512}) + ->Args({3, 512}); + +BENCHMARK_TEMPLATE2(BM_ReadFileDiffBitWidth, 1, 2) + ->Args({1, 3}) + ->Args({1, 5}) + ->Args({1, 6}) + ->Args({1, 8}) + ->Args({1, 9}) + ->Args({1, 10}) + ->Args({1, 11}) + ->Args({1, 12}) + ->Args({1, 13}) + ->Args({1, 14}) + ->Args({1, 16}) + ->Args({1, 18}); + +// } // namespace benchmark + +} // namespace arrow diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 44f762d7113e4..66528b74a98b5 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1494,8 +1494,13 @@ class DictDecoderImpl : public DecoderImpl, virtual public DictDecoder { int Decode(T* buffer, int num_values) override { num_values = std::min(num_values, num_values_); int decoded_values = +#ifndef ENABLE_QPL_ANALYSIS idx_decoder_.GetBatchWithDict(reinterpret_cast(dictionary_->data()), dictionary_length_, buffer, num_values); +#else + idx_decoder_.GetBatchWithDictIAA(reinterpret_cast(dictionary_->data()), + dictionary_length_, buffer, num_values); +#endif if (decoded_values != num_values) { ParquetException::EofException(); } diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 374a02cf491f9..aa8dc8c98412d 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -28,6 +28,12 @@ #include "parquet/platform.h" #include "parquet/types.h" +#ifdef ENABLE_QPL_ANALYSIS +#include +#include +#endif + + namespace arrow { class Array; @@ -278,7 +284,11 @@ class TypedDecoder : virtual public Decoder { /// \return The number of values decoded. Should be identical to max_values except /// at the end of the current data page. virtual int Decode(T* buffer, int max_values) = 0; - +#ifdef ENABLE_QPL_ANALYSIS + virtual int DecodeWithIAA(T* buffer, int num_values, int32_t* qpl_job_id, qpl_job** job, std::vector** destination, T** out) { + return Decode(buffer, num_values); + } +#endif /// \brief Decode the values in this data page but leave spaces for null entries. /// /// \param[in] buffer destination for decoded values diff --git a/cpp/submodules/qpl b/cpp/submodules/qpl new file mode 160000 index 0000000000000..cdc8442f7a5e7 --- /dev/null +++ b/cpp/submodules/qpl @@ -0,0 +1 @@ +Subproject commit cdc8442f7a5e7a6ff6eea39c69665e0c5034d85d