diff --git a/be/src/vec/exprs/table_function/table_function.h b/be/src/vec/exprs/table_function/table_function.h index c817067470a22a..97a0a5785e21cc 100644 --- a/be/src/vec/exprs/table_function/table_function.h +++ b/be/src/vec/exprs/table_function/table_function.h @@ -54,17 +54,7 @@ class TableFunction { } virtual void get_value(MutableColumnPtr& column) = 0; - - virtual int get_value(MutableColumnPtr& column, int max_step) { - max_step = std::max(1, std::min(max_step, (int)(_cur_size - _cur_offset))); - int i = 0; - // TODO: this for loop maybe could refactor, and call once get_value function, it's could insert into max_step value once - for (; i < max_step && !eos(); i++) { - get_value(column); - forward(); - } - return i; - } + virtual int get_value(MutableColumnPtr& column, int max_step) = 0; virtual Status close() { return Status::OK(); } diff --git a/be/src/vec/exprs/table_function/table_function_factory.cpp b/be/src/vec/exprs/table_function/table_function_factory.cpp index 29b201b59479d5..79e8a16f6abf92 100644 --- a/be/src/vec/exprs/table_function/table_function_factory.cpp +++ b/be/src/vec/exprs/table_function/table_function_factory.cpp @@ -19,6 +19,7 @@ #include +#include #include #include @@ -40,31 +41,21 @@ struct TableFunctionCreator { std::unique_ptr operator()() { return TableFunctionType::create_unique(); } }; -template <> -struct TableFunctionCreator { - ExplodeJsonArrayType type; - std::unique_ptr operator()() const { - return VExplodeJsonArrayTableFunction::create_unique(type); +template +struct VExplodeJsonArrayCreator { + std::unique_ptr operator()() { + return VExplodeJsonArrayTableFunction::create_unique(); } }; -inline auto VExplodeJsonArrayIntCreator = - TableFunctionCreator {ExplodeJsonArrayType::INT}; -inline auto VExplodeJsonArrayDoubleCreator = - TableFunctionCreator {ExplodeJsonArrayType::DOUBLE}; -inline auto VExplodeJsonArrayStringCreator = - TableFunctionCreator {ExplodeJsonArrayType::STRING}; -inline auto VExplodeJsonArrayJsonCreator = - TableFunctionCreator {ExplodeJsonArrayType::JSON}; - const std::unordered_map()>> TableFunctionFactory::_function_map { {"explode_split", TableFunctionCreator()}, {"explode_numbers", TableFunctionCreator()}, - {"explode_json_array_int", VExplodeJsonArrayIntCreator}, - {"explode_json_array_double", VExplodeJsonArrayDoubleCreator}, - {"explode_json_array_string", VExplodeJsonArrayStringCreator}, - {"explode_json_array_json", VExplodeJsonArrayJsonCreator}, + {"explode_json_array_int", VExplodeJsonArrayCreator()}, + {"explode_json_array_double", VExplodeJsonArrayCreator()}, + {"explode_json_array_string", VExplodeJsonArrayCreator()}, + {"explode_json_array_json", VExplodeJsonArrayCreator()}, {"explode_bitmap", TableFunctionCreator()}, {"explode_map", TableFunctionCreator {}}, {"explode", TableFunctionCreator {}}}; diff --git a/be/src/vec/exprs/table_function/vexplode.cpp b/be/src/vec/exprs/table_function/vexplode.cpp index b505339ea7c915..58764a44fb1516 100644 --- a/be/src/vec/exprs/table_function/vexplode.cpp +++ b/be/src/vec/exprs/table_function/vexplode.cpp @@ -47,7 +47,6 @@ Status VExplodeTableFunction::process_init(Block* block, RuntimeState* state) { _array_column = block->get_by_position(value_column_idx).column->convert_to_full_column_if_const(); - if (!extract_column_array_info(*_array_column, _detail)) { return Status::NotSupported("column type {} not supported now", block->get_by_position(value_column_idx).column->get_name()); @@ -90,4 +89,28 @@ void VExplodeTableFunction::get_value(MutableColumnPtr& column) { } } +int VExplodeTableFunction::get_value(MutableColumnPtr& column, int max_step) { + max_step = std::min(max_step, (int)(_cur_size - _cur_offset)); + size_t pos = _array_offset + _cur_offset; + if (current_empty()) { + column->insert_default(); + max_step = 1; + } else { + if (_is_nullable) { + auto* nullable_column = assert_cast(column.get()); + auto nested_column = nullable_column->get_nested_column_ptr(); + auto* nullmap_column = + assert_cast(nullable_column->get_null_map_column_ptr().get()); + nested_column->insert_range_from(*_detail.nested_col, pos, max_step); + size_t old_size = nullmap_column->size(); + nullmap_column->resize(old_size + max_step); + memcpy(nullmap_column->get_data().data() + old_size, + _detail.nested_nullmap_data + pos * sizeof(UInt8), max_step * sizeof(UInt8)); + } else { + column->insert_range_from(*_detail.nested_col, pos, max_step); + } + } + forward(max_step); + return max_step; +} } // namespace doris::vectorized diff --git a/be/src/vec/exprs/table_function/vexplode.h b/be/src/vec/exprs/table_function/vexplode.h index d5d84a70e2edc4..86f9f75869d9c9 100644 --- a/be/src/vec/exprs/table_function/vexplode.h +++ b/be/src/vec/exprs/table_function/vexplode.h @@ -44,6 +44,7 @@ class VExplodeTableFunction : public TableFunction { void process_row(size_t row_idx) override; void process_close() override; void get_value(MutableColumnPtr& column) override; + int get_value(MutableColumnPtr& column, int max_step) override; private: ColumnPtr _array_column; diff --git a/be/src/vec/exprs/table_function/vexplode_json_array.cpp b/be/src/vec/exprs/table_function/vexplode_json_array.cpp index fbff29390e942d..ca98c2c97593b9 100644 --- a/be/src/vec/exprs/table_function/vexplode_json_array.cpp +++ b/be/src/vec/exprs/table_function/vexplode_json_array.cpp @@ -17,6 +17,7 @@ #include "vec/exprs/table_function/vexplode_json_array.h" +#include #include #include #include @@ -25,165 +26,23 @@ #include #include "common/status.h" -#include "rapidjson/stringbuffer.h" -#include "rapidjson/writer.h" #include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/columns_number.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" +#include "vec/core/types.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" namespace doris::vectorized { - -std::string ParsedData::true_value = "true"; -std::string ParsedData::false_value = "false"; -auto max_value = std::numeric_limits::max(); //9223372036854775807 -auto min_value = std::numeric_limits::min(); //-9223372036854775808 - -int ParsedData::set_output(ExplodeJsonArrayType type, rapidjson::Document& document) { - int size = document.GetArray().Size(); - switch (type) { - case ExplodeJsonArrayType::INT: { - _data.resize(size); - _backup_int.resize(size); - int i = 0; - for (auto& v : document.GetArray()) { - if (v.IsInt64()) { - _backup_int[i] = v.GetInt64(); - _data[i] = &_backup_int[i]; - } else if (v.IsUint64()) { - auto value = v.GetUint64(); - if (value > max_value) { - _backup_int[i] = max_value; - } else { - _backup_int[i] = value; - } - _data[i] = &_backup_int[i]; - } else if (v.IsDouble()) { - auto value = v.GetDouble(); - if (value > max_value) { - _backup_int[i] = max_value; - } else if (value < min_value) { - _backup_int[i] = min_value; - } else { - _backup_int[i] = long(value); - } - _data[i] = &_backup_int[i]; - } else { - _data[i] = nullptr; - } - ++i; - } - break; - } - case ExplodeJsonArrayType::DOUBLE: { - _data.resize(size); - _backup_double.resize(size); - int i = 0; - for (auto& v : document.GetArray()) { - if (v.IsDouble()) { - _backup_double[i] = v.GetDouble(); - _data[i] = &_backup_double[i]; - } else { - _data[i] = nullptr; - } - ++i; - } - break; - } - case ExplodeJsonArrayType::STRING: { - _data_string.clear(); - _backup_string.clear(); - _string_nulls.clear(); - int32_t wbytes = 0; - for (auto& v : document.GetArray()) { - switch (v.GetType()) { - case rapidjson::Type::kStringType: - _backup_string.emplace_back(v.GetString(), v.GetStringLength()); - _string_nulls.push_back(false); - // do not set _data_string here. - // Because the address of the string stored in `_backup_string` may - // change each time `emplace_back()` is called. - break; - case rapidjson::Type::kNumberType: - if (v.IsUint()) { - wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%u", v.GetUint()); - } else if (v.IsInt()) { - wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%d", v.GetInt()); - } else if (v.IsUint64()) { - wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%" PRIu64, v.GetUint64()); - } else if (v.IsInt64()) { - wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%" PRId64, v.GetInt64()); - } else { - wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%f", v.GetDouble()); - } - _backup_string.emplace_back(tmp_buf, wbytes); - _string_nulls.push_back(false); - // do not set _data_string here. - // Because the address of the string stored in `_backup_string` may - // change each time `emplace_back()` is called. - break; - case rapidjson::Type::kFalseType: - _backup_string.emplace_back(true_value); - _string_nulls.push_back(false); - break; - case rapidjson::Type::kTrueType: - _backup_string.emplace_back(false_value); - _string_nulls.push_back(false); - break; - case rapidjson::Type::kNullType: - _backup_string.emplace_back(); - _string_nulls.push_back(true); - break; - default: - _backup_string.emplace_back(); - _string_nulls.push_back(true); - break; - } - } - // Must set _data_string at the end, so that we can - // save the real addr of string in `_backup_string` to `_data_string`. - for (auto& str : _backup_string) { - _data_string.emplace_back(str); - } - break; - } - case ExplodeJsonArrayType::JSON: { - _data_string.clear(); - _backup_string.clear(); - _string_nulls.clear(); - for (auto& v : document.GetArray()) { - if (v.IsObject()) { - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - v.Accept(writer); - _backup_string.emplace_back(buffer.GetString(), buffer.GetSize()); - _string_nulls.push_back(false); - } else { - _data_string.push_back({}); - _string_nulls.push_back(true); - } - } - // Must set _data_string at the end, so that we can - // save the real addr of string in `_backup_string` to `_data_string`. - for (auto& str : _backup_string) { - _data_string.emplace_back(str); - } - break; - } - default: - CHECK(false) << type; - break; - } - return size; -} - -VExplodeJsonArrayTableFunction::VExplodeJsonArrayTableFunction(ExplodeJsonArrayType type) - : _type(type) { +template +VExplodeJsonArrayTableFunction::VExplodeJsonArrayTableFunction() : TableFunction() { _fn_name = "vexplode_json_array"; } -Status VExplodeJsonArrayTableFunction::process_init(Block* block, RuntimeState* state) { +template +Status VExplodeJsonArrayTableFunction::process_init(Block* block, RuntimeState* state) { CHECK(_expr_context->root()->children().size() == 1) << _expr_context->root()->children().size(); @@ -191,11 +50,11 @@ Status VExplodeJsonArrayTableFunction::process_init(Block* block, RuntimeState* RETURN_IF_ERROR(_expr_context->root()->children()[0]->execute(_expr_context.get(), block, &text_column_idx)); _text_column = block->get_by_position(text_column_idx).column; - return Status::OK(); } -void VExplodeJsonArrayTableFunction::process_row(size_t row_idx) { +template +void VExplodeJsonArrayTableFunction::process_row(size_t row_idx) { TableFunction::process_row(row_idx); StringRef text = _text_column->get_data_at(row_idx); @@ -203,22 +62,62 @@ void VExplodeJsonArrayTableFunction::process_row(size_t row_idx) { rapidjson::Document document; document.Parse(text.data, text.size); if (!document.HasParseError() && document.IsArray() && document.GetArray().Size()) { - _cur_size = _parsed_data.set_output(_type, document); + _cur_size = _parsed_data.set_output(document, document.GetArray().Size()); } } } -void VExplodeJsonArrayTableFunction::process_close() { +template +void VExplodeJsonArrayTableFunction::process_close() { _text_column = nullptr; + _parsed_data.reset(); +} + +template +void VExplodeJsonArrayTableFunction::get_value(MutableColumnPtr& column) { + if (current_empty()) { + column->insert_default(); + } else { + insert_values_into_column(column, 1); + } } -void VExplodeJsonArrayTableFunction::get_value(MutableColumnPtr& column) { - if (current_empty() || _parsed_data.get_value(_type, _cur_offset, true) == nullptr) { +template +int VExplodeJsonArrayTableFunction::get_value(MutableColumnPtr& column, int max_step) { + max_step = std::min(max_step, (int)(_cur_size - _cur_offset)); + if (current_empty()) { column->insert_default(); + max_step = 1; + } else { + insert_values_into_column(column, max_step); + } + forward(max_step); + return max_step; +} + +template +void VExplodeJsonArrayTableFunction::insert_values_into_column(MutableColumnPtr& column, + int max_step) { + if (_is_nullable) { + auto* nullable_column = assert_cast(column.get()); + auto nested_column = nullable_column->get_nested_column_ptr(); + + _parsed_data.insert_result_from_parsed_data(nested_column, max_step, _cur_offset); + + auto* nullmap_column = + assert_cast(nullable_column->get_null_map_column_ptr().get()); + size_t old_size = nullmap_column->size(); + nullmap_column->resize(old_size + max_step); + memcpy(nullmap_column->get_data().data() + old_size, + _parsed_data.get_null_flag_address(_cur_offset), max_step * sizeof(UInt8)); } else { - column->insert_data((char*)_parsed_data.get_value(_type, _cur_offset, true), - _parsed_data.get_value_length(_type, _cur_offset)); + _parsed_data.insert_result_from_parsed_data(column, max_step, _cur_offset); } } +template class VExplodeJsonArrayTableFunction; +template class VExplodeJsonArrayTableFunction; +template class VExplodeJsonArrayTableFunction; +template class VExplodeJsonArrayTableFunction; + } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exprs/table_function/vexplode_json_array.h b/be/src/vec/exprs/table_function/vexplode_json_array.h index 42b3cba299437a..b054446b0c0d4b 100644 --- a/be/src/vec/exprs/table_function/vexplode_json_array.h +++ b/be/src/vec/exprs/table_function/vexplode_json_array.h @@ -19,8 +19,6 @@ #include #include -#include -#include #include #include @@ -28,76 +26,222 @@ #include "common/status.h" #include "gutil/integral_types.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" #include "vec/common/string_ref.h" +#include "vec/core/types.h" #include "vec/data_types/data_type.h" #include "vec/exprs/table_function/table_function.h" -namespace doris { -namespace vectorized { -class Block; -} // namespace vectorized -} // namespace doris - namespace doris::vectorized { -enum ExplodeJsonArrayType { INT = 0, DOUBLE, STRING, JSON }; - +template struct ParsedData { - static std::string true_value; - static std::string false_value; - - // The number parsed from json array - // the `_backup` saved the real number entity. - std::vector _data; - std::vector _data_string; - std::vector _backup_int; - std::vector _backup_double; - std::vector _backup_string; - std::vector _string_nulls; - char tmp_buf[128] = {0}; + ParsedData() = default; + virtual ~ParsedData() = default; + virtual void reset() { + _backup_data.clear(); + _values_null_flag.clear(); + } + virtual int set_output(rapidjson::Document& document, int value_size) = 0; + virtual void insert_result_from_parsed_data(MutableColumnPtr& column, int max_step, + int64_t cur_offset) = 0; + const char* get_null_flag_address(int cur_offset) { + return reinterpret_cast(_values_null_flag.data() + cur_offset); + } + std::vector _values_null_flag; + std::vector _backup_data; +}; - void* get_value(ExplodeJsonArrayType type, int64_t offset, bool real = false) { - switch (type) { - case ExplodeJsonArrayType::INT: - case ExplodeJsonArrayType::DOUBLE: - return _data[offset]; - case ExplodeJsonArrayType::JSON: - case ExplodeJsonArrayType::STRING: - return _string_nulls[offset] ? nullptr - : real ? reinterpret_cast(_backup_string[offset].data()) - : &_data_string[offset]; - default: - return nullptr; +struct ParsedDataInt : public ParsedData { + static constexpr auto MAX_VALUE = std::numeric_limits::max(); //9223372036854775807 + static constexpr auto MIN_VALUE = std::numeric_limits::min(); //-9223372036854775808 + + int set_output(rapidjson::Document& document, int value_size) override { + _values_null_flag.resize(value_size, 0); + _backup_data.resize(value_size); + int i = 0; + for (auto& v : document.GetArray()) { + if (v.IsInt64()) { + _backup_data[i] = v.GetInt64(); + } else if (v.IsUint64()) { + auto value = v.GetUint64(); + if (value > MAX_VALUE) { + _backup_data[i] = MAX_VALUE; + } else { + _backup_data[i] = value; + } + } else if (v.IsDouble()) { + auto value = v.GetDouble(); + if (value > MAX_VALUE) { + _backup_data[i] = MAX_VALUE; + } else if (value < MIN_VALUE) { + _backup_data[i] = MIN_VALUE; + } else { + _backup_data[i] = long(value); + } + } else { + _values_null_flag[i] = 1; + _backup_data[i] = 0; + } + ++i; } + return value_size; + } + void insert_result_from_parsed_data(MutableColumnPtr& column, int max_step, + int64_t cur_offset) override { + assert_cast(column.get()) + ->insert_many_raw_data( + reinterpret_cast(_backup_data.data() + cur_offset), max_step); } +}; - int64 get_value_length(ExplodeJsonArrayType type, int64_t offset) { - if ((type == ExplodeJsonArrayType::STRING || type == ExplodeJsonArrayType::JSON) && - !_string_nulls[offset]) { - return _backup_string[offset].size(); +struct ParsedDataDouble : public ParsedData { + int set_output(rapidjson::Document& document, int value_size) override { + _values_null_flag.resize(value_size, 0); + _backup_data.resize(value_size); + int i = 0; + for (auto& v : document.GetArray()) { + if (v.IsDouble()) { + _backup_data[i] = v.GetDouble(); + } else { + _backup_data[i] = 0; + _values_null_flag[i] = 1; + } + ++i; } - return 0; + return value_size; + } + void insert_result_from_parsed_data(MutableColumnPtr& column, int max_step, + int64_t cur_offset) override { + assert_cast(column.get()) + ->insert_many_raw_data( + reinterpret_cast(_backup_data.data() + cur_offset), max_step); + } +}; + +struct ParsedDataStringBase : public ParsedData { + void insert_result_from_parsed_data(MutableColumnPtr& column, int max_step, + int64_t cur_offset) override { + assert_cast(column.get()) + ->insert_many_strings(_data_string_ref.data() + cur_offset, max_step); + } + void reset() override { + ParsedData::reset(); + _data_string_ref.clear(); } - int set_output(ExplodeJsonArrayType type, rapidjson::Document& document); + static constexpr const char* TRUE_VALUE = "true"; + static constexpr const char* FALSE_VALUE = "false"; + std::vector _data_string_ref; + char tmp_buf[128] = {0}; +}; + +struct ParsedDataString : public ParsedDataStringBase { + int set_output(rapidjson::Document& document, int value_size) override { + _data_string_ref.clear(); + _backup_data.clear(); + _values_null_flag.clear(); + int32_t wbytes = 0; + for (auto& v : document.GetArray()) { + switch (v.GetType()) { + case rapidjson::Type::kStringType: { + _backup_data.emplace_back(v.GetString(), v.GetStringLength()); + _values_null_flag.emplace_back(false); + break; + // do not set _data_string here. + // Because the address of the string stored in `_backup_data` may + // change each time `emplace_back()` is called. + } + case rapidjson::Type::kNumberType: { + if (v.IsUint()) { + wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%u", v.GetUint()); + } else if (v.IsInt()) { + wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%d", v.GetInt()); + } else if (v.IsUint64()) { + wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%" PRIu64, v.GetUint64()); + } else if (v.IsInt64()) { + wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%" PRId64, v.GetInt64()); + } else { + wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%f", v.GetDouble()); + } + _backup_data.emplace_back(tmp_buf, wbytes); + _values_null_flag.emplace_back(false); + // do not set _data_string here. + // Because the address of the string stored in `_backup_data` may + // change each time `emplace_back()` is called. + break; + } + case rapidjson::Type::kFalseType: + _backup_data.emplace_back(TRUE_VALUE); + _values_null_flag.emplace_back(false); + break; + case rapidjson::Type::kTrueType: + _backup_data.emplace_back(FALSE_VALUE); + _values_null_flag.emplace_back(false); + break; + case rapidjson::Type::kNullType: + _backup_data.emplace_back(); + _values_null_flag.emplace_back(true); + break; + default: + _backup_data.emplace_back(); + _values_null_flag.emplace_back(true); + break; + } + } + // Must set _data_string at the end, so that we can + // save the real addr of string in `_backup_data` to `_data_string`. + for (auto& str : _backup_data) { + _data_string_ref.emplace_back(str.data(), str.length()); + } + return value_size; + } }; +struct ParsedDataJSON : public ParsedDataStringBase { + int set_output(rapidjson::Document& document, int value_size) override { + _data_string_ref.clear(); + _backup_data.clear(); + _values_null_flag.clear(); + for (auto& v : document.GetArray()) { + if (v.IsObject()) { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + v.Accept(writer); + _backup_data.emplace_back(buffer.GetString(), buffer.GetSize()); + _values_null_flag.emplace_back(false); + } else { + _backup_data.emplace_back(); + _values_null_flag.emplace_back(true); + } + } + // Must set _data_string at the end, so that we can + // save the real addr of string in `_backup_data` to `_data_string`. + for (auto& str : _backup_data) { + _data_string_ref.emplace_back(str); + } + return value_size; + } +}; + +template class VExplodeJsonArrayTableFunction final : public TableFunction { - ENABLE_FACTORY_CREATOR(VExplodeJsonArrayTableFunction); + ENABLE_FACTORY_CREATOR(VExplodeJsonArrayTableFunction); public: - VExplodeJsonArrayTableFunction(ExplodeJsonArrayType type); + VExplodeJsonArrayTableFunction(); ~VExplodeJsonArrayTableFunction() override = default; Status process_init(Block* block, RuntimeState* state) override; void process_row(size_t row_idx) override; void process_close() override; void get_value(MutableColumnPtr& column) override; + int get_value(MutableColumnPtr& column, int max_step) override; + void insert_values_into_column(MutableColumnPtr& column, int max_step); private: - ParsedData _parsed_data; - ExplodeJsonArrayType _type; - + DataImpl _parsed_data; ColumnPtr _text_column; }; diff --git a/be/src/vec/exprs/table_function/vexplode_map.cpp b/be/src/vec/exprs/table_function/vexplode_map.cpp index 923316fd282309..cf4eb2b701561c 100644 --- a/be/src/vec/exprs/table_function/vexplode_map.cpp +++ b/be/src/vec/exprs/table_function/vexplode_map.cpp @@ -122,4 +122,38 @@ void VExplodeMapTableFunction::get_value(MutableColumnPtr& column) { ret->get_column(1).insert_from(_map_detail.map_col->get_values(), pos); } +int VExplodeMapTableFunction::get_value(MutableColumnPtr& column, int max_step) { + max_step = std::min(max_step, (int)(_cur_size - _cur_offset)); + size_t pos = _collection_offset + _cur_offset; + if (current_empty()) { + column->insert_default(); + max_step = 1; + } else { + ColumnStruct* struct_column = nullptr; + if (_is_nullable) { + auto* nullable_column = assert_cast(column.get()); + struct_column = + assert_cast(nullable_column->get_nested_column_ptr().get()); + auto* nullmap_column = + assert_cast(nullable_column->get_null_map_column_ptr().get()); + // here nullmap_column insert max_step many defaults as if MAP[row_idx] is NULL + // will be not update value, _cur_size = 0, means current_empty; + // so here could insert directly + nullmap_column->insert_many_defaults(max_step); + } else { + struct_column = assert_cast(column.get()); + } + if (!struct_column || struct_column->tuple_size() != 2) { + throw Exception(ErrorCode::INTERNAL_ERROR, + "only support map column explode to two column, but given: ", + struct_column->tuple_size()); + } + struct_column->get_column(0).insert_range_from(_map_detail.map_col->get_keys(), pos, + max_step); + struct_column->get_column(1).insert_range_from(_map_detail.map_col->get_values(), pos, + max_step); + } + forward(max_step); + return max_step; +} } // namespace doris::vectorized diff --git a/be/src/vec/exprs/table_function/vexplode_map.h b/be/src/vec/exprs/table_function/vexplode_map.h index 770c06e3bb9c68..969629f1cd306c 100644 --- a/be/src/vec/exprs/table_function/vexplode_map.h +++ b/be/src/vec/exprs/table_function/vexplode_map.h @@ -58,6 +58,7 @@ class VExplodeMapTableFunction : public TableFunction { void process_row(size_t row_idx) override; void process_close() override; void get_value(MutableColumnPtr& column) override; + int get_value(MutableColumnPtr& column, int max_step) override; private: ColumnPtr _collection_column;