Skip to content

Commit

Permalink
Apply patches to the Arrow to 16.1.0 (#68)
Browse files Browse the repository at this point in the history
* Empty commit

* Merge pull request #66 from copperybean/release-13.0.0

Check validation of of bit offset when reading bit packed values

(cherry picked from commit 5cfccd8)

* Merge pull request #47 from ClickHouse/fix-uninit-value-msan

Fix possible use-of-uninitizliaed-value

(cherry picked from commit ba5c679)

* Merge pull request #39 from ClickHouse/count-from-record-batch

Allow to get number of rows in record batch

(cherry picked from commit 1d93838)

* Merge pull request #9 from taiyang-li/raw_orc_reader

Add interface to get raw orc reader from adapters

(cherry picked from commit ce6b7af)

* Merge pull request #10 from taiyang-li/fix_pr_9

fix building issue introduced by https://github.com/ClickHouse-Extras…

(cherry picked from commit 20dc6ad)

* Merge pull request #14 from ClickHouse/fix-deadlock

Fix deadlock with msan

(cherry picked from commit b41ff44)

* Merge pull request #15 from ClickHouse/try-fix-data-race

Fix 'undefined symbol: pthread_atfork' on PowerPC64

(cherry picked from commit 450a563)

* Merge pull request #16 from ClickHouse/remove-abort-in-logging

Don't abort in ~CerrLog

(cherry picked from commit d03245f)

* Merge pull request #17 from bigo-sg/allow_map_key_optional

Allow Parquet map key to be optional

(cherry picked from commit 0d6d07f)

* Fix build

(cherry picked from commit 3264fda)

---------

Co-authored-by: Michael Kolupaev <[email protected]>
Co-authored-by: Kruglov Pavel <[email protected]>
  • Loading branch information
3 people authored Oct 16, 2024
1 parent 7dd1d34 commit 6e2574f
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 16 deletions.
14 changes: 14 additions & 0 deletions cpp/src/arrow/adapters/orc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ class ORCFileReader::Impl {
return Init();
}

virtual liborc::Reader* GetRawORCReader() {
return reader_.get();
}


Status Init() {
int64_t nstripes = reader_->getNumberOfStripes();
stripes_.resize(static_cast<size_t>(nstripes));
Expand Down Expand Up @@ -559,6 +564,15 @@ ORCFileReader::ORCFileReader() { impl_.reset(new ORCFileReader::Impl()); }

ORCFileReader::~ORCFileReader() {}

liborc::Reader* ORCFileReader::GetRawORCReader() {
return impl_->GetRawORCReader();
}

Status ORCFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
MemoryPool* pool, std::unique_ptr<ORCFileReader>* reader) {
return Open(file, pool).Value(reader);
}

Result<std::unique_ptr<ORCFileReader>> ORCFileReader::Open(
const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool) {
#ifdef ARROW_ORC_NEED_TIME_ZONE_DATABASE_CHECK
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/adapters/orc/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
#include "arrow/adapters/orc/util.h"

namespace arrow {
namespace adapters {
Expand All @@ -53,6 +54,19 @@ class ARROW_EXPORT ORCFileReader {
public:
~ORCFileReader();

/// \brief Creates a new ORC reader.
///
/// \param[in] file the data source
/// \param[in] pool a MemoryPool to use for buffer allocations
/// \param[out] reader the returned reader object
/// \return Status
ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
static Status Open(const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool,
std::unique_ptr<ORCFileReader>* reader);

/// \brief Get ORC reader from inside.
liborc::Reader* GetRawORCReader();

/// \brief Creates a new ORC reader
///
/// \param[in] file the data source
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,23 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
return total;
}

Result<int64_t> RecordBatchCountRows(int i) override {
DCHECK_GE(i, 0);
DCHECK_LT(i, num_record_batches());
ARROW_ASSIGN_OR_RAISE(auto outer_message,
ReadMessageFromBlock(GetRecordBatchBlock(i)));
auto metadata = outer_message->metadata();
const flatbuf::Message* message = nullptr;
RETURN_NOT_OK(
internal::VerifyMessage(metadata->data(), metadata->size(), &message));
auto batch = message->header_as_RecordBatch();
if (batch == nullptr) {
return Status::IOError(
"Header-type of flatbuffer-encoded Message is not RecordBatch.");
}
return batch->length();
}

Status Open(const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
const IpcReadOptions& options) {
owned_file_ = file;
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/ipc/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ class ARROW_EXPORT RecordBatchFileReader
/// \brief Computes the total number of rows in the file.
virtual Result<int64_t> CountRows() = 0;

virtual Result<int64_t> RecordBatchCountRows(int i) = 0;

/// \brief Begin loading metadata for the desired batches into memory.
///
/// This method will also begin loading all dictionaries messages into memory.
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/util/bit_stream_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer,
#pragma warning(push)
#pragma warning(disable : 4800)
#endif
if (ARROW_PREDICT_FALSE(*bit_offset >= 64)) {
auto msg = std::string("invalid bit offset: ") + std::to_string(*bit_offset);
msg += ", may be malformed num_bits: " + std::to_string(num_bits);
throw std::runtime_error(msg);
}
*v = static_cast<T>(bit_util::TrailingBits(*buffered_values, *bit_offset + num_bits) >>
*bit_offset);
#ifdef _MSC_VER
Expand Down
15 changes: 8 additions & 7 deletions cpp/src/arrow/util/logging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#endif
#include <cstdlib>
#include <iostream>
#include <sstream>

#ifdef ARROW_USE_GLOG

Expand Down Expand Up @@ -65,33 +66,33 @@ class CerrLog {
public:
explicit CerrLog(ArrowLogLevel severity) : severity_(severity), has_logged_(false) {}

virtual ~CerrLog() {
virtual ~CerrLog() noexcept(false) {
if (has_logged_) {
std::cerr << std::endl;
stream << std::endl;
}
if (severity_ == ArrowLogLevel::ARROW_FATAL) {
PrintBackTrace();
std::abort();
throw std::runtime_error(stream.str());
}
}

std::ostream& Stream() {
has_logged_ = true;
return std::cerr;
return stream;
}

template <class T>
CerrLog& operator<<(const T& t) {
if (severity_ != ArrowLogLevel::ARROW_DEBUG) {
has_logged_ = true;
std::cerr << t;
stream << t;
}
return *this;
}

protected:
const ArrowLogLevel severity_;
bool has_logged_;
std::stringstream stream;

void PrintBackTrace() {
#ifdef ARROW_WITH_BACKTRACE
Expand Down Expand Up @@ -248,7 +249,7 @@ std::ostream& ArrowLog::Stream() {

bool ArrowLog::IsEnabled() const { return is_enabled_; }

ArrowLog::~ArrowLog() {
ArrowLog::~ArrowLog() noexcept(false) {
if (logging_provider_ != nullptr) {
delete reinterpret_cast<LoggingProvider*>(logging_provider_);
logging_provider_ = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/util/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ enum class ArrowLogLevel : int {
// This is also a null log which does not output anything.
class ARROW_EXPORT ArrowLogBase {
public:
virtual ~ArrowLogBase() {}
virtual ~ArrowLogBase() noexcept(false) {}

virtual bool IsEnabled() const { return false; }

Expand All @@ -176,7 +176,7 @@ class ARROW_EXPORT ArrowLogBase {
class ARROW_EXPORT ArrowLog : public ArrowLogBase {
public:
ArrowLog(const char* file_name, int line_number, ArrowLogLevel severity);
~ArrowLog() override;
~ArrowLog() noexcept(false) override;

/// Return whether or not current logging instance is enabled.
///
Expand Down
33 changes: 28 additions & 5 deletions cpp/src/arrow/util/mutex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include <mutex>

#ifndef _WIN32
#if !defined( _WIN32) && !defined(__ppc64__)
#include <pthread.h>
#include <atomic>
#endif
Expand Down Expand Up @@ -59,7 +59,7 @@ Mutex::Guard Mutex::Lock() {

Mutex::Mutex() : impl_(new Impl, [](Impl* impl) { delete impl; }) {}

#ifndef _WIN32
#if !defined( _WIN32) && !defined(__ppc64__)
namespace {

struct AfterForkState {
Expand All @@ -71,19 +71,42 @@ struct AfterForkState {
// The leak (only in child processes) is a small price to pay for robustness.
Mutex* mutex = nullptr;

enum State {
INITIALIZED,
IN_PROCESS,
NOT_INITIALIZED,
};

std::atomic_int state = INITIALIZED;

private:
AfterForkState() {
pthread_atfork(/*prepare=*/nullptr, /*parent=*/nullptr, /*child=*/&AfterFork);
}

static void AfterFork() { instance.mutex = new Mutex; }
static void AfterFork() { instance.state.store(NOT_INITIALIZED); }

};

AfterForkState AfterForkState::instance;
} // namespace

Mutex* GlobalForkSafeMutex() { return AfterForkState::instance.mutex; }
#endif // _WIN32
Mutex* GlobalForkSafeMutex() {
if (AfterForkState::instance.state.load() == AfterForkState::State::INITIALIZED) {
return AfterForkState::instance.mutex;
}

int expected = AfterForkState::State::NOT_INITIALIZED;
if (AfterForkState::instance.state.compare_exchange_strong(expected, AfterForkState::State::IN_PROCESS)) {
AfterForkState::instance.mutex = new Mutex;
AfterForkState::instance.state.store(AfterForkState::State::INITIALIZED);
} else {
while (AfterForkState::instance.state.load() != AfterForkState::State::INITIALIZED);
}

return AfterForkState::instance.mutex;
}
#endif // _WIN32 and __ppc64__

} // namespace util
} // namespace arrow
2 changes: 1 addition & 1 deletion cpp/src/arrow/util/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ARROW_EXPORT Mutex {
std::unique_ptr<Impl, void (*)(Impl*)> impl_;
};

#ifndef _WIN32
#if !defined(_WIN32) && !defined(__ppc64__)
/// Return a pointer to a process-wide, process-specific Mutex that can be used
/// at any point in a child process. NULL is returned when called in the parent.
///
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,23 @@ Status MapToSchemaField(const GroupNode& group, LevelInfo current_levels,
return Status::Invalid("Key-value map node must have 1 or 2 child elements. Found: ",
key_value.field_count());
}

/*
* If Parquet file was written by Flink, key type of map column is allowed to be optional, like this:
* optional group event_info (MAP) {
* repeated group key_value {
* optional binary key (UTF8);
* optional binary value (UTF8);
* }
* }
*
* Refer to: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/types/#constructured-data-types
const Node& key_node = *key_value.field(0);
if (!key_node.is_required()) {
return Status::Invalid("Map keys must be annotated as required.");
}
*/

// Arrow doesn't support 1 column maps (i.e. Sets). The options are to either
// make the values column nullable, or process the map as a list. We choose the latter
// as it is simpler.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2721,7 +2721,7 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
T min_delta_;
uint32_t mini_block_idx_;
std::shared_ptr<ResizableBuffer> delta_bit_widths_;
int delta_bit_width_;
int delta_bit_width_ = 0;

T last_value_;
};
Expand Down

0 comments on commit 6e2574f

Please sign in to comment.