Skip to content

Commit

Permalink
fix logic for enable_runtime_filter_partition_prune and rename
Browse files Browse the repository at this point in the history
  • Loading branch information
suxiaogang223 committed Feb 8, 2025
1 parent 71eb85c commit 1e8e7af
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 40 deletions.
77 changes: 43 additions & 34 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ Status VFileScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
_convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileScannerConvertOuputBlockTime", 1);
_runtime_filter_partition_pruning_timer = ADD_TIMER_WITH_LEVEL(
_runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
_local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
_empty_file_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
Expand Down Expand Up @@ -188,7 +188,7 @@ Status VFileScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju
}

// check if the expr is a partition pruning expr
bool VFileScanner::_check_partition_pruning_expr(const VExprSPtr& expr) {
bool VFileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
if (expr->is_slot_ref()) {
auto* slot_ref = static_cast<VSlotRef*>(expr.get());
return _partition_slot_index_map.find(slot_ref->slot_id()) !=
Expand All @@ -198,38 +198,38 @@ bool VFileScanner::_check_partition_pruning_expr(const VExprSPtr& expr) {
return true;
}
return std::ranges::all_of(expr->children(), [this](const auto& child) {
return _check_partition_pruning_expr(child);
return _check_partition_prune_expr(child);
});
}

void VFileScanner::_init_runtime_filter_partition_pruning_ctxs() {
if (_partition_slot_index_map.empty()) {
return;
}
_runtime_filter_partition_pruning_ctxs.clear();
void VFileScanner::_init_runtime_filter_partition_prune_ctxs() {
_runtime_filter_partition_prune_ctxs.clear();
for (auto& conjunct : _conjuncts) {
auto impl = conjunct->root()->get_impl();
// If impl is not null, which means this a conjuncts from runtime filter.
auto expr = impl ? impl : conjunct->root();
if (_check_partition_pruning_expr(expr)) {
_runtime_filter_partition_pruning_ctxs.emplace_back(conjunct);
if (_check_partition_prune_expr(expr)) {
_runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
}
}
}

void VFileScanner::_init_runtime_filter_partition_prune_block() {
// init block with empty column
for (auto const* slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->need_materialize()) {
// should be ignored from reading
continue;
}
_runtime_filter_partition_pruning_block.insert(
_runtime_filter_partition_prune_block.insert(
ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}
}

Status VFileScanner::_process_runtime_filters_partition_pruning(bool& can_filter_all) {
SCOPED_TIMER(_runtime_filter_partition_pruning_timer);
if (_runtime_filter_partition_pruning_ctxs.empty() || _partition_col_descs.empty()) {
Status VFileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
SCOPED_TIMER(_runtime_filter_partition_prune_timer);
if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
return Status::OK();
}
size_t partition_value_column_size = 1;
Expand All @@ -248,8 +248,8 @@ Status VFileScanner::_process_runtime_filters_partition_pruning(bool& can_filter
parititon_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
}

// 2. Fill _runtime_filter_partition_pruning_block from the partition column, then execute conjuncts and filter block.
// 2.1 Fill _runtime_filter_partition_pruning_block from the partition column to match the conjuncts executing.
// 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
// 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
size_t index = 0;
bool first_column_filled = false;
for (auto const* slot_desc : _real_tuple_desc->slots()) {
Expand All @@ -262,14 +262,14 @@ Status VFileScanner::_process_runtime_filters_partition_pruning(bool& can_filter
auto data_type = slot_desc->get_data_type_ptr();
auto partition_value_column = std::move(parititon_slot_id_to_column[slot_desc->id()]);
if (data_type->is_nullable()) {
_runtime_filter_partition_pruning_block.insert(
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(
ColumnNullable::create(
std::move(partition_value_column),
ColumnUInt8::create(partition_value_column_size, 0)),
data_type, slot_desc->col_name()));
} else {
_runtime_filter_partition_pruning_block.insert(
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
slot_desc->col_name()));
}
Expand All @@ -284,12 +284,12 @@ Status VFileScanner::_process_runtime_filters_partition_pruning(bool& can_filter
if (!first_column_filled) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
_runtime_filter_partition_pruning_block.get_by_position(0).column->assume_mutable()->resize(
_runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
partition_value_column_size);
}
IColumn::Filter result_filter(_runtime_filter_partition_pruning_block.rows(), 1);
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_pruning_ctxs, nullptr,
&_runtime_filter_partition_pruning_block,
IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
&_runtime_filter_partition_prune_block,
&result_filter, &can_filter_all));
return Status::OK();
}
Expand Down Expand Up @@ -357,7 +357,11 @@ Status VFileScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
if (_first_scan_range) {
RETURN_IF_ERROR(_init_expr_ctxes());
_init_runtime_filter_partition_pruning_ctxs();
if (_state->query_options().enable_runtime_filter_partition_prune &&
!_partition_slot_index_map.empty()) {
_init_runtime_filter_partition_prune_ctxs();
_init_runtime_filter_partition_prune_block();
}
} else {
// there's no scan range in split source. stop scanner directly.
_scanner_eof = true;
Expand Down Expand Up @@ -876,18 +880,23 @@ Status VFileScanner::_get_next_reader() {
if (!_partition_slot_descs.empty()) {
// we need get partition columns first for runtime filter partition pruning
RETURN_IF_ERROR(_generate_parititon_columns());
if (_push_down_conjuncts.size() < _conjuncts.size()) {
// there are new runtime filters, need to re-init runtime filter partition pruning ctxs
_init_runtime_filter_partition_pruning_ctxs();
}

bool can_filter_all = false;
RETURN_IF_ERROR(_process_runtime_filters_partition_pruning(can_filter_all));
if (can_filter_all) {
// this range can be filtered out by runtime filter partition pruning
// so we need to skip this range
COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
continue;
if (_state->query_options().enable_runtime_filter_partition_prune) {
// if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
// by runtime filter partition prune
if (_push_down_conjuncts.size() < _conjuncts.size()) {
// there are new runtime filters, need to re-init runtime filter partition pruning ctxs
_init_runtime_filter_partition_prune_ctxs();
}

bool can_filter_all = false;
RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
if (can_filter_all) {
// this range can be filtered out by runtime filter partition pruning
// so we need to skip this range
COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
continue;
}
}
}

Expand Down
13 changes: 7 additions & 6 deletions be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ class VFileScanner : public VScanner {
Block _src_block;

VExprContextSPtrs _push_down_conjuncts;
VExprContextSPtrs _runtime_filter_partition_pruning_ctxs;
Block _runtime_filter_partition_pruning_block;
VExprContextSPtrs _runtime_filter_partition_prune_ctxs;
Block _runtime_filter_partition_prune_block;

std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<io::IOContext> _io_ctx;
Expand All @@ -186,7 +186,7 @@ class VFileScanner : public VScanner {
RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
RuntimeProfile::Counter* _pre_filter_timer = nullptr;
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
RuntimeProfile::Counter* _runtime_filter_partition_pruning_timer = nullptr;
RuntimeProfile::Counter* _runtime_filter_partition_prune_timer = nullptr;
RuntimeProfile::Counter* _empty_file_counter = nullptr;
RuntimeProfile::Counter* _not_found_file_counter = nullptr;
RuntimeProfile::Counter* _file_counter = nullptr;
Expand Down Expand Up @@ -220,9 +220,10 @@ class VFileScanner : public VScanner {
void _truncate_char_or_varchar_column(Block* block, int idx, int len);
Status _generate_parititon_columns();
Status _generate_missing_columns();
bool _check_partition_pruning_expr(const VExprSPtr& expr);
void _init_runtime_filter_partition_pruning_ctxs();
Status _process_runtime_filters_partition_pruning(bool& is_partition_pruning);
bool _check_partition_prune_expr(const VExprSPtr& expr);
void _init_runtime_filter_partition_prune_ctxs();
void _init_runtime_filter_partition_prune_block();
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
Status _process_conjuncts_for_dict_filter();
Status _process_late_arrival_conjuncts();
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
Expand Down

0 comments on commit 1e8e7af

Please sign in to comment.