Skip to content

Commit

Permalink
[improve](table function) opt explode/explode_map/explode_json table …
Browse files Browse the repository at this point in the history
…function (apache#33904)

before get_value, it's will insert one row into column once,
now could insert many result into column once, could reduce some virtual function call
  • Loading branch information
zhangstar333 authored Apr 24, 2024
1 parent 5faa666 commit c2c250c
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 232 deletions.
12 changes: 1 addition & 11 deletions be/src/vec/exprs/table_function/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,7 @@ class TableFunction {
}

virtual void get_value(MutableColumnPtr& column) = 0;

virtual int get_value(MutableColumnPtr& column, int max_step) {
max_step = std::max(1, std::min(max_step, (int)(_cur_size - _cur_offset)));
int i = 0;
// TODO: this for loop maybe could refactor, and call once get_value function, it's could insert into max_step value once
for (; i < max_step && !eos(); i++) {
get_value(column);
forward();
}
return i;
}
virtual int get_value(MutableColumnPtr& column, int max_step) = 0;

virtual Status close() { return Status::OK(); }

Expand Down
27 changes: 9 additions & 18 deletions be/src/vec/exprs/table_function/table_function_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <gen_cpp/Types_types.h>

#include <memory>
#include <string_view>
#include <utility>

Expand All @@ -40,31 +41,21 @@ struct TableFunctionCreator {
std::unique_ptr<TableFunction> operator()() { return TableFunctionType::create_unique(); }
};

template <>
struct TableFunctionCreator<VExplodeJsonArrayTableFunction> {
ExplodeJsonArrayType type;
std::unique_ptr<TableFunction> operator()() const {
return VExplodeJsonArrayTableFunction::create_unique(type);
template <typename DataImpl>
struct VExplodeJsonArrayCreator {
std::unique_ptr<TableFunction> operator()() {
return VExplodeJsonArrayTableFunction<DataImpl>::create_unique();
}
};

inline auto VExplodeJsonArrayIntCreator =
TableFunctionCreator<VExplodeJsonArrayTableFunction> {ExplodeJsonArrayType::INT};
inline auto VExplodeJsonArrayDoubleCreator =
TableFunctionCreator<VExplodeJsonArrayTableFunction> {ExplodeJsonArrayType::DOUBLE};
inline auto VExplodeJsonArrayStringCreator =
TableFunctionCreator<VExplodeJsonArrayTableFunction> {ExplodeJsonArrayType::STRING};
inline auto VExplodeJsonArrayJsonCreator =
TableFunctionCreator<VExplodeJsonArrayTableFunction> {ExplodeJsonArrayType::JSON};

const std::unordered_map<std::string, std::function<std::unique_ptr<TableFunction>()>>
TableFunctionFactory::_function_map {
{"explode_split", TableFunctionCreator<VExplodeSplitTableFunction>()},
{"explode_numbers", TableFunctionCreator<VExplodeNumbersTableFunction>()},
{"explode_json_array_int", VExplodeJsonArrayIntCreator},
{"explode_json_array_double", VExplodeJsonArrayDoubleCreator},
{"explode_json_array_string", VExplodeJsonArrayStringCreator},
{"explode_json_array_json", VExplodeJsonArrayJsonCreator},
{"explode_json_array_int", VExplodeJsonArrayCreator<ParsedDataInt>()},
{"explode_json_array_double", VExplodeJsonArrayCreator<ParsedDataDouble>()},
{"explode_json_array_string", VExplodeJsonArrayCreator<ParsedDataString>()},
{"explode_json_array_json", VExplodeJsonArrayCreator<ParsedDataJSON>()},
{"explode_bitmap", TableFunctionCreator<VExplodeBitmapTableFunction>()},
{"explode_map", TableFunctionCreator<VExplodeMapTableFunction> {}},
{"explode", TableFunctionCreator<VExplodeTableFunction> {}}};
Expand Down
25 changes: 24 additions & 1 deletion be/src/vec/exprs/table_function/vexplode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ Status VExplodeTableFunction::process_init(Block* block, RuntimeState* state) {

_array_column =
block->get_by_position(value_column_idx).column->convert_to_full_column_if_const();

if (!extract_column_array_info(*_array_column, _detail)) {
return Status::NotSupported("column type {} not supported now",
block->get_by_position(value_column_idx).column->get_name());
Expand Down Expand Up @@ -90,4 +89,28 @@ void VExplodeTableFunction::get_value(MutableColumnPtr& column) {
}
}

int VExplodeTableFunction::get_value(MutableColumnPtr& column, int max_step) {
max_step = std::min(max_step, (int)(_cur_size - _cur_offset));
size_t pos = _array_offset + _cur_offset;
if (current_empty()) {
column->insert_default();
max_step = 1;
} else {
if (_is_nullable) {
auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
auto nested_column = nullable_column->get_nested_column_ptr();
auto* nullmap_column =
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
nested_column->insert_range_from(*_detail.nested_col, pos, max_step);
size_t old_size = nullmap_column->size();
nullmap_column->resize(old_size + max_step);
memcpy(nullmap_column->get_data().data() + old_size,
_detail.nested_nullmap_data + pos * sizeof(UInt8), max_step * sizeof(UInt8));
} else {
column->insert_range_from(*_detail.nested_col, pos, max_step);
}
}
forward(max_step);
return max_step;
}
} // namespace doris::vectorized
1 change: 1 addition & 0 deletions be/src/vec/exprs/table_function/vexplode.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class VExplodeTableFunction : public TableFunction {
void process_row(size_t row_idx) override;
void process_close() override;
void get_value(MutableColumnPtr& column) override;
int get_value(MutableColumnPtr& column, int max_step) override;

private:
ColumnPtr _array_column;
Expand Down
213 changes: 56 additions & 157 deletions be/src/vec/exprs/table_function/vexplode_json_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "vec/exprs/table_function/vexplode_json_array.h"

#include <glog/logging.h>
#include <inttypes.h>
#include <rapidjson/rapidjson.h>
#include <stdio.h>
Expand All @@ -25,200 +26,98 @@
#include <limits>

#include "common/status.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/columns_number.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"

namespace doris::vectorized {

std::string ParsedData::true_value = "true";
std::string ParsedData::false_value = "false";
auto max_value = std::numeric_limits<int64_t>::max(); //9223372036854775807
auto min_value = std::numeric_limits<int64_t>::min(); //-9223372036854775808

int ParsedData::set_output(ExplodeJsonArrayType type, rapidjson::Document& document) {
int size = document.GetArray().Size();
switch (type) {
case ExplodeJsonArrayType::INT: {
_data.resize(size);
_backup_int.resize(size);
int i = 0;
for (auto& v : document.GetArray()) {
if (v.IsInt64()) {
_backup_int[i] = v.GetInt64();
_data[i] = &_backup_int[i];
} else if (v.IsUint64()) {
auto value = v.GetUint64();
if (value > max_value) {
_backup_int[i] = max_value;
} else {
_backup_int[i] = value;
}
_data[i] = &_backup_int[i];
} else if (v.IsDouble()) {
auto value = v.GetDouble();
if (value > max_value) {
_backup_int[i] = max_value;
} else if (value < min_value) {
_backup_int[i] = min_value;
} else {
_backup_int[i] = long(value);
}
_data[i] = &_backup_int[i];
} else {
_data[i] = nullptr;
}
++i;
}
break;
}
case ExplodeJsonArrayType::DOUBLE: {
_data.resize(size);
_backup_double.resize(size);
int i = 0;
for (auto& v : document.GetArray()) {
if (v.IsDouble()) {
_backup_double[i] = v.GetDouble();
_data[i] = &_backup_double[i];
} else {
_data[i] = nullptr;
}
++i;
}
break;
}
case ExplodeJsonArrayType::STRING: {
_data_string.clear();
_backup_string.clear();
_string_nulls.clear();
int32_t wbytes = 0;
for (auto& v : document.GetArray()) {
switch (v.GetType()) {
case rapidjson::Type::kStringType:
_backup_string.emplace_back(v.GetString(), v.GetStringLength());
_string_nulls.push_back(false);
// do not set _data_string here.
// Because the address of the string stored in `_backup_string` may
// change each time `emplace_back()` is called.
break;
case rapidjson::Type::kNumberType:
if (v.IsUint()) {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%u", v.GetUint());
} else if (v.IsInt()) {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%d", v.GetInt());
} else if (v.IsUint64()) {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%" PRIu64, v.GetUint64());
} else if (v.IsInt64()) {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%" PRId64, v.GetInt64());
} else {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%f", v.GetDouble());
}
_backup_string.emplace_back(tmp_buf, wbytes);
_string_nulls.push_back(false);
// do not set _data_string here.
// Because the address of the string stored in `_backup_string` may
// change each time `emplace_back()` is called.
break;
case rapidjson::Type::kFalseType:
_backup_string.emplace_back(true_value);
_string_nulls.push_back(false);
break;
case rapidjson::Type::kTrueType:
_backup_string.emplace_back(false_value);
_string_nulls.push_back(false);
break;
case rapidjson::Type::kNullType:
_backup_string.emplace_back();
_string_nulls.push_back(true);
break;
default:
_backup_string.emplace_back();
_string_nulls.push_back(true);
break;
}
}
// Must set _data_string at the end, so that we can
// save the real addr of string in `_backup_string` to `_data_string`.
for (auto& str : _backup_string) {
_data_string.emplace_back(str);
}
break;
}
case ExplodeJsonArrayType::JSON: {
_data_string.clear();
_backup_string.clear();
_string_nulls.clear();
for (auto& v : document.GetArray()) {
if (v.IsObject()) {
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
v.Accept(writer);
_backup_string.emplace_back(buffer.GetString(), buffer.GetSize());
_string_nulls.push_back(false);
} else {
_data_string.push_back({});
_string_nulls.push_back(true);
}
}
// Must set _data_string at the end, so that we can
// save the real addr of string in `_backup_string` to `_data_string`.
for (auto& str : _backup_string) {
_data_string.emplace_back(str);
}
break;
}
default:
CHECK(false) << type;
break;
}
return size;
}

VExplodeJsonArrayTableFunction::VExplodeJsonArrayTableFunction(ExplodeJsonArrayType type)
: _type(type) {
template <typename DataImpl>
VExplodeJsonArrayTableFunction<DataImpl>::VExplodeJsonArrayTableFunction() : TableFunction() {
_fn_name = "vexplode_json_array";
}

Status VExplodeJsonArrayTableFunction::process_init(Block* block, RuntimeState* state) {
template <typename DataImpl>
Status VExplodeJsonArrayTableFunction<DataImpl>::process_init(Block* block, RuntimeState* state) {
CHECK(_expr_context->root()->children().size() == 1)
<< _expr_context->root()->children().size();

int text_column_idx = -1;
RETURN_IF_ERROR(_expr_context->root()->children()[0]->execute(_expr_context.get(), block,
&text_column_idx));
_text_column = block->get_by_position(text_column_idx).column;

return Status::OK();
}

void VExplodeJsonArrayTableFunction::process_row(size_t row_idx) {
template <typename DataImpl>
void VExplodeJsonArrayTableFunction<DataImpl>::process_row(size_t row_idx) {
TableFunction::process_row(row_idx);

StringRef text = _text_column->get_data_at(row_idx);
if (text.data != nullptr) {
rapidjson::Document document;
document.Parse(text.data, text.size);
if (!document.HasParseError() && document.IsArray() && document.GetArray().Size()) {
_cur_size = _parsed_data.set_output(_type, document);
_cur_size = _parsed_data.set_output(document, document.GetArray().Size());
}
}
}

void VExplodeJsonArrayTableFunction::process_close() {
template <typename DataImpl>
void VExplodeJsonArrayTableFunction<DataImpl>::process_close() {
_text_column = nullptr;
_parsed_data.reset();
}

template <typename DataImpl>
void VExplodeJsonArrayTableFunction<DataImpl>::get_value(MutableColumnPtr& column) {
if (current_empty()) {
column->insert_default();
} else {
insert_values_into_column(column, 1);
}
}

void VExplodeJsonArrayTableFunction::get_value(MutableColumnPtr& column) {
if (current_empty() || _parsed_data.get_value(_type, _cur_offset, true) == nullptr) {
template <typename DataImpl>
int VExplodeJsonArrayTableFunction<DataImpl>::get_value(MutableColumnPtr& column, int max_step) {
max_step = std::min(max_step, (int)(_cur_size - _cur_offset));
if (current_empty()) {
column->insert_default();
max_step = 1;
} else {
insert_values_into_column(column, max_step);
}
forward(max_step);
return max_step;
}

template <typename DataImpl>
void VExplodeJsonArrayTableFunction<DataImpl>::insert_values_into_column(MutableColumnPtr& column,
int max_step) {
if (_is_nullable) {
auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
auto nested_column = nullable_column->get_nested_column_ptr();

_parsed_data.insert_result_from_parsed_data(nested_column, max_step, _cur_offset);

auto* nullmap_column =
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
size_t old_size = nullmap_column->size();
nullmap_column->resize(old_size + max_step);
memcpy(nullmap_column->get_data().data() + old_size,
_parsed_data.get_null_flag_address(_cur_offset), max_step * sizeof(UInt8));
} else {
column->insert_data((char*)_parsed_data.get_value(_type, _cur_offset, true),
_parsed_data.get_value_length(_type, _cur_offset));
_parsed_data.insert_result_from_parsed_data(column, max_step, _cur_offset);
}
}

template class VExplodeJsonArrayTableFunction<ParsedDataInt>;
template class VExplodeJsonArrayTableFunction<ParsedDataDouble>;
template class VExplodeJsonArrayTableFunction<ParsedDataString>;
template class VExplodeJsonArrayTableFunction<ParsedDataJSON>;

} // namespace doris::vectorized
Loading

0 comments on commit c2c250c

Please sign in to comment.