Skip to content

Commit

Permalink
Add Intel®-IAA/QPL-based Parquet RLE Decode, qpl is thirdparty tool c…
Browse files Browse the repository at this point in the history
…hain
  • Loading branch information
yaqi-zhao committed Nov 24, 2022
1 parent 5e53978 commit 477d068
Show file tree
Hide file tree
Showing 14 changed files with 821 additions and 3 deletions.
13 changes: 13 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,15 @@ if(UNIX)
add_custom_target(iwyu-all ${BUILD_SUPPORT_DIR}/iwyu/iwyu.sh all)
endif(UNIX)

if(ARROW_WITH_QPL)
add_definitions(-DENABLE_QPL_ANALYSIS)
list(APPEND ARROW_STATIC_LINK_LIBS qpl::qpl)
if(QPL_SOURCE STREQUAL "SYSTEM")
list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS qpl::qpl)
endif()
message("ENABLE_QPL_ANALYSIS" ${ENABLE_QPL_ANALYSIS})
endif()

# datetime code used by iOS requires zlib support
if(IOS)
set(ARROW_WITH_ZLIB ON)
Expand Down Expand Up @@ -645,6 +654,10 @@ set(ARROW_SHARED_INSTALL_INTERFACE_LIBS)
set(ARROW_STATIC_LINK_LIBS arrow::flatbuffers arrow::hadoop)
set(ARROW_STATIC_INSTALL_INTERFACE_LIBS)

if(ARROW_WITH_QPL)
list(APPEND ARROW_SHARED_LINK_LIBS qpl::qpl)
endif()

if(ARROW_USE_BOOST)
list(APPEND ARROW_SHARED_LINK_LIBS Boost::headers)
list(APPEND ARROW_STATIC_LINK_LIBS Boost::headers)
Expand Down
2 changes: 2 additions & 0 deletions cpp/cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ takes precedence over ccache if a storage backend is configured" ON)

define_option(ARROW_PLASMA "Build the plasma object store along with Arrow" OFF)

define_option(ARROW_WITH_QPL "Enable Intel® Query Processing Library" ON)

define_option(ARROW_PYTHON
"Build some components needed by PyArrow.;\
(This is a deprecated option. Use CMake presets instead.)"
Expand Down
52 changes: 51 additions & 1 deletion cpp/cmake_modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ set(ARROW_THIRDPARTY_DEPENDENCIES
utf8proc
xsimd
ZLIB
zstd)
zstd
QPL)

# For backward compatibility. We use "BOOST_SOURCE" if "Boost_SOURCE"
# isn't specified and "BOOST_SOURCE" is specified.
Expand Down Expand Up @@ -203,6 +204,8 @@ macro(build_dependency DEPENDENCY_NAME)
build_zlib()
elseif("${DEPENDENCY_NAME}" STREQUAL "zstd")
build_zstd()
elseif("${DEPENDENCY_NAME}" STREQUAL "QPL")
build_qpl()
else()
message(FATAL_ERROR "Unknown thirdparty dependency to build: ${DEPENDENCY_NAME}")
endif()
Expand Down Expand Up @@ -2203,6 +2206,53 @@ if(ARROW_WITH_RAPIDJSON)
endif()
endif()


macro(build_qpl)
message(STATUS "Building QPL from source")
set(QPL_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/qpl_ep/src/qpl_ep-install")
if(MSVC)
set(QPL_STATIC_LIB_NAME qplstatic.lib)
else()
set(QPL_STATIC_LIB_NAME libqpl.a)
endif()
set(QPL_STATIC_LIB "${QPL_PREFIX}/lib64/${QPL_STATIC_LIB_NAME}")
set(QPL_CMAKE_ARGS
${EP_COMMON_CMAKE_ARGS}
-DCMAKE_BUILD_TYPE=Release
"-DCMAKE_INSTALL_PREFIX=${QPL_PREFIX}"
EXCLUDE_FROM_ALL NOT)


ExternalProject_Add(qpl_ep
${EP_LOG_OPTIONS}
GIT_REPOSITORY https://github.com/intel/qpl.git
GIT_TAG v0.2.1
GIT_SUBMODULES_RECURSE TRUE
BUILD_BYPRODUCTS "${QPL_STATIC_LIB}"
CMAKE_ARGS ${QPL_CMAKE_ARGS}
)


file(MAKE_DIRECTORY "${QPL_PREFIX}/include")

add_library(qpl::qpl STATIC IMPORTED)
set(QPL_LIBRARIES ${QPL_STATIC_LIB})
set(QPL_INCLUDE_DIRS "${QPL_PREFIX}/include")
set_target_properties(qpl::qpl
PROPERTIES IMPORTED_LOCATION ${QPL_LIBRARIES}
INTERFACE_INCLUDE_DIRECTORIES ${QPL_INCLUDE_DIRS})

add_dependencies(toolchain qpl_ep)
add_dependencies(qpl::qpl qpl_ep)

list(APPEND ARROW_BUNDLED_STATIC_LIBS qpl::qpl)
set(QPL_VENDORED TRUE)
endmacro()

if(ARROW_WITH_QPL)
resolve_dependency(QPL PC_PACKAGE_NAMES qpl)
endif()

macro(build_xsimd)
message(STATUS "Building xsimd from source")
set(XSIMD_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/xsimd_ep/src/xsimd_ep-install")
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ set(ARROW_SRCS
util/uri.cc
util/utf8.cc
util/value_parsing.cc
util/qpl_job_pool.cc
vendored/base64.cpp
vendored/datetime/tz.cpp
vendored/double-conversion/bignum.cc
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/util/bit_stream_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,15 @@ class BitReader {
/// Maximum byte length of a vlq encoded int64
static constexpr int kMaxVlqByteLengthForInt64 = 10;

const uint8_t * getBuffer() {
return buffer_ - 1;
}

int getBufferLen() {
return max_bytes_ + 1;
}


private:
const uint8_t* buffer_;
int max_bytes_;
Expand Down
120 changes: 120 additions & 0 deletions cpp/src/arrow/util/qpl_job_pool.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <arrow/status.h>

#ifdef ENABLE_QPL_ANALYSIS
#include "arrow/util/qpl_job_pool.h"

namespace arrow {
namespace internal {
namespace detail {

std::array<qpl_job *, QplJobHWPool::MAX_JOB_NUMBER> QplJobHWPool::hw_job_ptr_pool;
std::array<std::atomic_bool, QplJobHWPool::MAX_JOB_NUMBER> QplJobHWPool::job_ptr_locks;
bool QplJobHWPool::job_pool_ready = false;
std::unique_ptr<uint8_t[]> QplJobHWPool::hw_jobs_buffer;

QplJobHWPool & QplJobHWPool::instance() {
static QplJobHWPool pool;
return pool;
}

QplJobHWPool::QplJobHWPool()
: random_engine(std::random_device()())
, distribution(0, MAX_JOB_NUMBER - 1) {
(void)initQPLJob();
}

QplJobHWPool::~QplJobHWPool() {
for (uint32_t i = 0; i < MAX_JOB_NUMBER; ++i) {
if (hw_job_ptr_pool[i]) {
qpl_fini_job(hw_job_ptr_pool[i]);
hw_job_ptr_pool[i] = nullptr;
}
}
job_pool_ready = false;
}

arrow::Status QplJobHWPool::initQPLJob() {
uint32_t job_size = 0;

/// Get size required for saving a single qpl job object
qpl_get_job_size(qpl_path_hardware, &job_size);
/// Allocate entire buffer for storing all job objects
hw_jobs_buffer = std::make_unique<uint8_t[]>(job_size * MAX_JOB_NUMBER);
/// Initialize pool for storing all job object pointers
/// Reallocate buffer by shifting address offset for each job object.
for (uint32_t index = 0; index < MAX_JOB_NUMBER; ++index) {
qpl_job * qpl_job_ptr = reinterpret_cast<qpl_job *>(hw_jobs_buffer.get() + index * job_size);
if (qpl_init_job(qpl_path_hardware, qpl_job_ptr) != QPL_STS_OK) {
job_pool_ready = false;
return arrow::Status::Invalid("Initialization of hardware IAA failed. Please check if Intel In-Memory Analytics Accelerator (IAA) is properly set up!");
}
hw_job_ptr_pool[index] = qpl_job_ptr;
unLockJob(index);
}

job_pool_ready = true;
return arrow::Status::OK();
}

qpl_job * QplJobHWPool::acquireJob(uint32_t & job_id) {
if (!isJobPoolReady() && !initQPLJob().ok()) {
return nullptr;
}

uint32_t retry = 0;
auto index = distribution(random_engine);
while (!tryLockJob(index)) {
index = distribution(random_engine);
retry++;
if (retry > MAX_JOB_NUMBER) {
return nullptr;
}
}
job_id = MAX_JOB_NUMBER - index;
if (index >= MAX_JOB_NUMBER) {
return nullptr;
}
return hw_job_ptr_pool[index];
}

void QplJobHWPool::releaseJob(uint32_t job_id) {
if (isJobPoolReady()) {
unLockJob(MAX_JOB_NUMBER - job_id);
}
}

bool QplJobHWPool::tryLockJob(uint32_t index) {
bool expected = false;
if (index >= MAX_JOB_NUMBER) {
return false;
}
return job_ptr_locks[index].compare_exchange_strong(expected, true);
}

void QplJobHWPool::unLockJob(uint32_t index) {
if (index >= MAX_JOB_NUMBER) {
return;
}
job_ptr_locks[index].store(false);
}
} // namespace detail
} // namespace internal
} // namespace arrow
#endif
70 changes: 70 additions & 0 deletions cpp/src/arrow/util/qpl_job_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>
#include <memory>
#include <utility>
#include <vector>
#include <random>
#include <arrow/status.h>

#ifdef ENABLE_QPL_ANALYSIS
#include <qpl/qpl.hpp>
#include <qpl/qpl.h>


namespace arrow {
namespace internal {
namespace detail {

/// QplJobHWPool is resource pool to provide the job objects.
/// Job object is used for storing context information during
// offloading compression job to HW Accelerator.
class QplJobHWPool {
public:
QplJobHWPool();
~QplJobHWPool();

static QplJobHWPool & instance();

qpl_job * acquireJob(uint32_t & job_id);
static void releaseJob(uint32_t job_id);
static const bool & isJobPoolReady() { return job_pool_ready; }

private:
static bool tryLockJob(uint32_t index);
static void unLockJob(uint32_t index);
static arrow::Status initQPLJob();

/// Maximum jobs running in parallel supported by IAA hardware
static constexpr auto MAX_JOB_NUMBER = 512;
/// Entire buffer for storing all job objects
static std::unique_ptr<uint8_t[]> hw_jobs_buffer;
/// Job pool for storing all job object pointers
static std::array<qpl_job *, MAX_JOB_NUMBER> hw_job_ptr_pool;
/// Locks for accessing each job object pointers
static std::array<std::atomic_bool, MAX_JOB_NUMBER> job_ptr_locks;
static bool job_pool_ready;
std::mt19937 random_engine;
std::uniform_int_distribution<int> distribution;
};
} // namespace detail
} // namespace internal
} // namespace arrow
#endif
Loading

0 comments on commit 477d068

Please sign in to comment.