diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 4c078094b98cd5..aaa271cdd09e05 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -754,19 +754,15 @@ struct LocalExchangeSharedState : public BasicSharedState { dep->set_ready(); } - void add_mem_usage(int channel_id, size_t delta, bool update_total_mem_usage = true) { - mem_counters[channel_id]->update(delta); - if (update_total_mem_usage) { - add_total_mem_usage(delta, channel_id); - } - } + void add_mem_usage(int channel_id, size_t delta) { mem_counters[channel_id]->update(delta); } void sub_mem_usage(int channel_id, size_t delta) { mem_counters[channel_id]->update(-(int64_t)delta); } virtual void add_total_mem_usage(size_t delta, int channel_id) { - if (mem_usage.fetch_add(delta) + delta > config::local_exchange_buffer_mem_limit) { + if (cast_set(mem_usage.fetch_add(delta) + delta) > + config::local_exchange_buffer_mem_limit) { sink_deps.front()->block(); } } @@ -775,7 +771,7 @@ struct LocalExchangeSharedState : public BasicSharedState { auto prev_usage = mem_usage.fetch_sub(delta); DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " delta: " << delta << " channel_id: " << channel_id; - if (prev_usage - delta <= config::local_exchange_buffer_mem_limit) { + if (cast_set(prev_usage - delta) <= config::local_exchange_buffer_mem_limit) { sink_deps.front()->set_ready(); } } diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 8b55bd6b44018d..5a4bc7bfe13e67 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -35,35 +35,19 @@ void Exchanger::_enqueue_data_and_set_ready(int channel_id, _enqueue_data_and_set_ready(channel_id, std::move(block)); return; } - size_t allocated_bytes = 0; // PartitionedBlock is used by shuffle exchanger. // PartitionedBlock will be push into multiple queues with different row ranges, so it will be // referenced multiple times. Otherwise, we only ref the block once because it is only push into // one queue. + std::unique_lock l(*_m[channel_id]); if constexpr (std::is_same_v || std::is_same_v) { - allocated_bytes = block.first->data_block.allocated_bytes(); + block.first->record_channel_id(channel_id); } else { - block->ref(1); - allocated_bytes = block->data_block.allocated_bytes(); + block->record_channel_id(channel_id); } - std::unique_lock l(*_m[channel_id]); - local_state->_shared_state->add_mem_usage(channel_id, allocated_bytes, - !std::is_same_v && - !std::is_same_v); if (_data_queue[channel_id].enqueue(std::move(block))) { local_state->_shared_state->set_ready_to_read(channel_id); - } else { - local_state->_shared_state->sub_mem_usage(channel_id, allocated_bytes); - // `enqueue(block)` return false iff this queue's source operator is already closed so we - // just unref the block. - if constexpr (std::is_same_v || - std::is_same_v) { - block.first->unref(local_state->_shared_state, allocated_bytes, channel_id); - } else { - block->unref(local_state->_shared_state, allocated_bytes, channel_id); - DCHECK_EQ(block->ref_value(), 0); - } } } @@ -78,14 +62,10 @@ bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState* local_st if (_data_queue[channel_id].try_dequeue(block)) { if constexpr (std::is_same_v || std::is_same_v) { - local_state->_shared_state->sub_mem_usage(channel_id, - block.first->data_block.allocated_bytes()); + local_state->_shared_state->sub_mem_usage(channel_id, block.first->_allocated_bytes); } else { - local_state->_shared_state->sub_mem_usage(channel_id, - block->data_block.allocated_bytes()); - data_block->swap(block->data_block); - block->unref(local_state->_shared_state, data_block->allocated_bytes(), channel_id); - DCHECK_EQ(block->ref_value(), 0); + local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); + data_block->swap(block->_data_block); } return true; } else if (all_finished) { @@ -95,14 +75,11 @@ bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState* local_st if (_data_queue[channel_id].try_dequeue(block)) { if constexpr (std::is_same_v || std::is_same_v) { - local_state->_shared_state->sub_mem_usage( - channel_id, block.first->data_block.allocated_bytes()); - } else { local_state->_shared_state->sub_mem_usage(channel_id, - block->data_block.allocated_bytes()); - data_block->swap(block->data_block); - block->unref(local_state->_shared_state, data_block->allocated_bytes(), channel_id); - DCHECK_EQ(block->ref_value(), 0); + block.first->_allocated_bytes); + } else { + local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); + data_block->swap(block->_data_block); } return true; } @@ -114,19 +91,13 @@ bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState* local_st template void Exchanger::_enqueue_data_and_set_ready(int channel_id, BlockType&& block) { - if constexpr (!std::is_same_v && - !std::is_same_v) { - block->ref(1); - } - if (!_data_queue[channel_id].enqueue(std::move(block))) { - if constexpr (std::is_same_v || - std::is_same_v) { - block.first->unref(); - } else { - block->unref(); - DCHECK_EQ(block->ref_value(), 0); - } + if constexpr (std::is_same_v || + std::is_same_v) { + block.first->record_channel_id(channel_id); + } else { + block->record_channel_id(channel_id); } + _data_queue[channel_id].enqueue(std::move(block)); } template @@ -135,9 +106,7 @@ bool Exchanger::_dequeue_data(BlockType& block, bool* eos, vectorized if (_data_queue[channel_id].try_dequeue(block)) { if constexpr (!std::is_same_v && !std::is_same_v) { - data_block->swap(block->data_block); - block->unref(); - DCHECK_EQ(block->ref_value(), 0); + data_block->swap(block->_data_block); } return true; } @@ -170,9 +139,7 @@ void ShuffleExchanger::close(SourceInfo&& source_info) { _data_queue[source_info.channel_id].set_eos(); while (_dequeue_data(source_info.local_state, partitioned_block, &eos, &block, source_info.channel_id)) { - partitioned_block.first->unref( - source_info.local_state ? source_info.local_state->_shared_state : nullptr, - source_info.channel_id); + // do nothing } } @@ -186,12 +153,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block const auto* offset_start = partitioned_block.second.row_idxs->data() + partitioned_block.second.offset_start; auto block_wrapper = partitioned_block.first; - Defer defer {[&]() { - block_wrapper->unref( - source_info.local_state ? source_info.local_state->_shared_state : nullptr, - source_info.channel_id); - }}; - RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, offset_start, + RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->_data_block, offset_start, offset_start + partitioned_block.second.length)); } while (mutable_block.rows() < state->batch_size() && !*eos && _dequeue_data(source_info.local_state, partitioned_block, eos, block, @@ -203,7 +165,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block source_info.channel_id)) { SCOPED_TIMER(profile.copy_data_timer); mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( - block, partitioned_block.first->data_block); + block, partitioned_block.first->_data_block); RETURN_IF_ERROR(get_data()); } return Status::OK(); @@ -235,52 +197,31 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest vectorized::Block data_block; std::shared_ptr new_block_wrapper; - if (_free_blocks.try_dequeue(data_block)) { - new_block_wrapper = BlockWrapper::create_shared(std::move(data_block)); - } else { - new_block_wrapper = BlockWrapper::create_shared(block->clone_empty()); + if (!_free_blocks.try_dequeue(data_block)) { + data_block = block->clone_empty(); } - - new_block_wrapper->data_block.swap(*block); - if (new_block_wrapper->data_block.empty()) { + data_block.swap(*block); + new_block_wrapper = + BlockWrapper::create_shared(std::move(data_block), local_state->_shared_state, -1); + if (new_block_wrapper->_data_block.empty()) { return Status::OK(); } - local_state->_shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes(), - channel_id); - if (get_type() == ExchangeType::HASH_SHUFFLE) { - /** - * If type is `HASH_SHUFFLE`, data are hash-shuffled and distributed to all instances of - * all BEs. So we need a shuffleId-To-InstanceId mapping. - * For example, row 1 get a hash value 1 which means we should distribute to instance 1 on - * BE 1 and row 2 get a hash value 2 which means we should distribute to instance 1 on BE 3. - */ - DCHECK(shuffle_idx_to_instance_idx && shuffle_idx_to_instance_idx->size() > 0); - const auto& map = *shuffle_idx_to_instance_idx; - new_block_wrapper->ref(cast_set(map.size())); - for (const auto& it : map) { - DCHECK(it.second >= 0 && it.second < _num_partitions) - << it.first << " : " << it.second << " " << _num_partitions; - uint32_t start = partition_rows_histogram[it.first]; - uint32_t size = partition_rows_histogram[it.first + 1] - start; - if (size > 0) { - _enqueue_data_and_set_ready(it.second, local_state, - {new_block_wrapper, {row_idx, start, size}}); - } else { - new_block_wrapper->unref(local_state->_shared_state, channel_id); - } - } - } else { - DCHECK(shuffle_idx_to_instance_idx && shuffle_idx_to_instance_idx->size() > 0); - new_block_wrapper->ref(_num_partitions); - for (int i = 0; i < _num_partitions; i++) { - uint32_t start = partition_rows_histogram[i]; - uint32_t size = partition_rows_histogram[i + 1] - start; - if (size > 0) { - _enqueue_data_and_set_ready((*shuffle_idx_to_instance_idx)[i], local_state, - {new_block_wrapper, {row_idx, start, size}}); - } else { - new_block_wrapper->unref(local_state->_shared_state, channel_id); - } + /** + * Data are hash-shuffled and distributed to all instances of + * all BEs. So we need a shuffleId-To-InstanceId mapping. + * For example, row 1 get a hash value 1 which means we should distribute to instance 1 on + * BE 1 and row 2 get a hash value 2 which means we should distribute to instance 1 on BE 3. + */ + DCHECK(shuffle_idx_to_instance_idx && shuffle_idx_to_instance_idx->size() > 0); + const auto& map = *shuffle_idx_to_instance_idx; + for (const auto& it : map) { + DCHECK(it.second >= 0 && it.second < _num_partitions) + << it.first << " : " << it.second << " " << _num_partitions; + uint32_t start = partition_rows_histogram[it.first]; + uint32_t size = partition_rows_histogram[it.first + 1] - start; + if (size > 0) { + _enqueue_data_and_set_ready(it.second, local_state, + {new_block_wrapper, {row_idx, start, size}}); } } @@ -308,24 +249,19 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest vectorized::Block data_block; std::shared_ptr new_block_wrapper; - if (_free_blocks.try_dequeue(data_block)) { - new_block_wrapper = BlockWrapper::create_shared(std::move(data_block)); - } else { - new_block_wrapper = BlockWrapper::create_shared(block->clone_empty()); + if (!_free_blocks.try_dequeue(data_block)) { + data_block = block->clone_empty(); } - - new_block_wrapper->data_block.swap(*block); - if (new_block_wrapper->data_block.empty()) { + data_block.swap(*block); + new_block_wrapper = BlockWrapper::create_shared(std::move(data_block), nullptr, -1); + if (new_block_wrapper->_data_block.empty()) { return Status::OK(); } - new_block_wrapper->ref(cast_set(_num_partitions)); for (int i = 0; i < _num_partitions; i++) { uint32_t start = partition_rows_histogram[i]; uint32_t size = partition_rows_histogram[i + 1] - start; if (size > 0) { _enqueue_data_and_set_ready(i, {new_block_wrapper, {row_idx, start, size}}); - } else { - new_block_wrapper->unref(); } } @@ -338,13 +274,15 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo return Status::OK(); } vectorized::Block new_block; - BlockWrapperSPtr wrapper; if (!_free_blocks.try_dequeue(new_block)) { new_block = {in_block->clone_empty()}; } new_block.swap(*in_block); - wrapper = BlockWrapper::create_shared(std::move(new_block)); auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; + BlockWrapperSPtr wrapper = BlockWrapper::create_shared( + std::move(new_block), + sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, channel_id); + _enqueue_data_and_set_ready(channel_id, sink_info.local_state, std::move(wrapper)); return Status::OK(); @@ -390,7 +328,9 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block } new_block.swap(*in_block); - BlockWrapperSPtr wrapper = BlockWrapper::create_shared(std::move(new_block)); + BlockWrapperSPtr wrapper = BlockWrapper::create_shared( + std::move(new_block), + sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, 0); _enqueue_data_and_set_ready(0, sink_info.local_state, std::move(wrapper)); return Status::OK(); @@ -417,8 +357,12 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_ DCHECK_LE(*sink_info.channel_id, _data_queue.size()); new_block.swap(*in_block); - _enqueue_data_and_set_ready(*sink_info.channel_id, sink_info.local_state, - BlockWrapper::create_shared(std::move(new_block))); + _enqueue_data_and_set_ready( + *sink_info.channel_id, sink_info.local_state, + BlockWrapper::create_shared( + std::move(new_block), + sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, + *sink_info.channel_id)); } if (eos && sink_info.local_state) { sink_info.local_state->_shared_state->source_deps[*sink_info.channel_id] @@ -503,16 +447,12 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block new_block = {in_block->clone_empty()}; } new_block.swap(*in_block); - auto wrapper = BlockWrapper::create_shared(std::move(new_block)); - if (sink_info.local_state) { - sink_info.local_state->_shared_state->add_total_mem_usage( - wrapper->data_block.allocated_bytes(), *sink_info.channel_id); - } - - wrapper->ref(_num_partitions); + auto wrapper = BlockWrapper::create_shared( + std::move(new_block), + sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, -1); for (int i = 0; i < _num_partitions; i++) { _enqueue_data_and_set_ready(i, sink_info.local_state, - {wrapper, {0, wrapper->data_block.rows()}}); + {wrapper, {0, wrapper->_data_block.rows()}}); } return Status::OK(); @@ -525,9 +465,7 @@ void BroadcastExchanger::close(SourceInfo&& source_info) { _data_queue[source_info.channel_id].set_eos(); while (_dequeue_data(source_info.local_state, partitioned_block, &eos, &block, source_info.channel_id)) { - partitioned_block.first->unref( - source_info.local_state ? source_info.local_state->_shared_state : nullptr, - source_info.channel_id); + // do nothing } } @@ -540,14 +478,11 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo SCOPED_TIMER(profile.copy_data_timer); vectorized::MutableBlock mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( - block, partitioned_block.first->data_block); + block, partitioned_block.first->_data_block); auto block_wrapper = partitioned_block.first; - RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, + RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->_data_block, partitioned_block.second.offset_start, partitioned_block.second.length)); - block_wrapper->unref( - source_info.local_state ? source_info.local_state->_shared_state : nullptr, - source_info.channel_id); } return Status::OK(); @@ -562,8 +497,12 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, } new_block.swap(*in_block); auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; - _enqueue_data_and_set_ready(channel_id, sink_info.local_state, - BlockWrapper::create_shared(std::move(new_block))); + _enqueue_data_and_set_ready( + channel_id, sink_info.local_state, + BlockWrapper::create_shared( + std::move(new_block), + sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, + channel_id)); return Status::OK(); } @@ -616,8 +555,12 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, RETURN_IF_ERROR(mutable_block->add_rows(block, start, size)); auto new_block = mutable_block->to_block(); - _enqueue_data_and_set_ready(i, sink_info.local_state, - BlockWrapper::create_shared(std::move(new_block))); + _enqueue_data_and_set_ready( + i, sink_info.local_state, + BlockWrapper::create_shared( + std::move(new_block), + sink_info.local_state ? sink_info.local_state->_shared_state : nullptr, + i)); } } return Status::OK(); diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 4aace54e6e3e57..90edeca07e8568 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -28,7 +28,7 @@ class PartitionerBase; namespace pipeline { class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; -struct BlockWrapper; +class BlockWrapper; class SortSourceOperatorX; struct Profile { @@ -63,6 +63,66 @@ struct SourceInfo { */ class ExchangerBase { public: + /** + * `BlockWrapper` is used to wrap a data block with a reference count. + * + * In function `unref()`, if `ref_count` decremented to 0, which means this block is not needed by + * operators, so we put it into `_free_blocks` to reuse its memory if needed and refresh memory usage + * in current queue. + * + * Note: `ref_count` will be larger than 1 only if this block is shared between multiple queues in + * shuffle exchanger. + */ + class BlockWrapper { + public: + ENABLE_FACTORY_CREATOR(BlockWrapper); + BlockWrapper(vectorized::Block&& data_block, LocalExchangeSharedState* shared_state, + int channel_id) + : _data_block(std::move(data_block)), + _shared_state(shared_state), + _allocated_bytes(_data_block.allocated_bytes()) { + if (_shared_state) { + _shared_state->add_total_mem_usage(_allocated_bytes, channel_id); + } + } + ~BlockWrapper() { + if (_shared_state != nullptr) { + DCHECK_GT(_allocated_bytes, 0); + _shared_state->sub_total_mem_usage(_allocated_bytes, _channel_ids.front()); + if (_shared_state->exchanger->_free_block_limit == 0 || + _shared_state->exchanger->_free_blocks.size_approx() < + _shared_state->exchanger->_free_block_limit * + _shared_state->exchanger->_num_sources) { + _data_block.clear_column_data(); + // Free blocks is used to improve memory efficiency. Failure during pushing back + // free block will not incur any bad result so just ignore the return value. + _shared_state->exchanger->_free_blocks.enqueue(std::move(_data_block)); + } + }; + } + void record_channel_id(int channel_id) { + _channel_ids.push_back(channel_id); + if (_shared_state) { + _shared_state->add_mem_usage(channel_id, _allocated_bytes); + } + } + + private: + friend class ShuffleExchanger; + friend class BucketShuffleExchanger; + friend class PassthroughExchanger; + friend class BroadcastExchanger; + friend class PassToOneExchanger; + friend class LocalMergeSortExchanger; + friend class AdaptivePassthroughExchanger; + template + friend class Exchanger; + + vectorized::Block _data_block; + LocalExchangeSharedState* _shared_state; + std::vector _channel_ids; + const size_t _allocated_bytes; + }; ExchangerBase(int running_sink_operators, int num_partitions, int free_block_limit) : _running_sink_operators(running_sink_operators), _running_source_operators(num_partitions), @@ -94,7 +154,6 @@ class ExchangerBase { protected: friend struct LocalExchangeSharedState; - friend struct BlockWrapper; friend class LocalExchangeSourceLocalState; friend class LocalExchangeSinkOperatorX; friend class LocalExchangeSinkLocalState; @@ -113,13 +172,14 @@ struct PartitionedRowIdxs { uint32_t length; }; -using PartitionedBlock = std::pair, PartitionedRowIdxs>; +using PartitionedBlock = + std::pair, PartitionedRowIdxs>; struct RowRange { uint32_t offset_start; size_t length; }; -using BroadcastBlock = std::pair, RowRange>; +using BroadcastBlock = std::pair, RowRange>; template struct BlockQueue { @@ -158,7 +218,7 @@ struct BlockQueue { void set_eos() { eos = true; } }; -using BlockWrapperSPtr = std::shared_ptr; +using BlockWrapperSPtr = std::shared_ptr; template class Exchanger : public ExchangerBase { @@ -201,45 +261,6 @@ class Exchanger : public ExchangerBase { class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; -/** - * `BlockWrapper` is used to wrap a data block with a reference count. - * - * In function `unref()`, if `ref_count` decremented to 0, which means this block is not needed by - * operators, so we put it into `_free_blocks` to reuse its memory if needed and refresh memory usage - * in current queue. - * - * Note: `ref_count` will be larger than 1 only if this block is shared between multiple queues in - * shuffle exchanger. - */ -struct BlockWrapper { - ENABLE_FACTORY_CREATOR(BlockWrapper); - BlockWrapper(vectorized::Block&& data_block_) : data_block(std::move(data_block_)) {} - ~BlockWrapper() { DCHECK_EQ(ref_count.load(), 0); } - void ref(int delta) { ref_count += delta; } - void unref(LocalExchangeSharedState* shared_state, size_t allocated_bytes, int channel_id) { - if (ref_count.fetch_sub(1) == 1 && shared_state != nullptr) { - DCHECK_GT(allocated_bytes, 0); - shared_state->sub_total_mem_usage(allocated_bytes, channel_id); - if (shared_state->exchanger->_free_block_limit == 0 || - shared_state->exchanger->_free_blocks.size_approx() < - shared_state->exchanger->_free_block_limit * - shared_state->exchanger->_num_sources) { - data_block.clear_column_data(); - // Free blocks is used to improve memory efficiency. Failure during pushing back - // free block will not incur any bad result so just ignore the return value. - shared_state->exchanger->_free_blocks.enqueue(std::move(data_block)); - } - } - } - - void unref(LocalExchangeSharedState* shared_state = nullptr, int channel_id = 0) { - unref(shared_state, data_block.allocated_bytes(), channel_id); - } - int ref_value() const { return ref_count.load(); } - std::atomic ref_count = 0; - vectorized::Block data_block; -}; - class ShuffleExchanger : public Exchanger { public: ENABLE_FACTORY_CREATOR(ShuffleExchanger); diff --git a/be/test/pipeline/local_exchanger_test.cpp b/be/test/pipeline/local_exchanger_test.cpp index 3db2375866cc96..9da285dd9be726 100644 --- a/be/test/pipeline/local_exchanger_test.cpp +++ b/be/test/pipeline/local_exchanger_test.cpp @@ -104,6 +104,23 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) { _sink_local_states[i]->_partitioner.reset( new vectorized::Crc32HashPartitioner( num_partitions)); + auto texpr = + TExprNodeBuilder(TExprNodeType::SLOT_REF, + TTypeDescBuilder() + .set_types(TTypeNodeBuilder() + .set_type(TTypeNodeType::SCALAR) + .set_scalar_type(TPrimitiveType::INT) + .build()) + .build(), + 0) + .set_slot_ref(TSlotRefBuilder(0, 0).build()) + .build(); + auto slot = doris::vectorized::VSlotRef::create_shared(texpr); + slot->_column_id = 0; + ((vectorized::Crc32HashPartitioner*)_sink_local_states[i] + ->_partitioner.get()) + ->_partition_expr_ctxs.push_back( + std::make_shared(slot)); _sink_local_states[i]->_channel_id = i; _sink_local_states[i]->_shared_state = shared_state.get(); _sink_local_states[i]->_dependency = sink_dep.get(); @@ -124,11 +141,15 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) { shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter; } + const auto expect_block_bytes = 128; + const auto num_blocks = 2; + config::local_exchange_buffer_mem_limit = + (num_partitions - 1) * num_blocks * expect_block_bytes; { // Enqueue 2 blocks with 10 rows for each data queue. for (size_t i = 0; i < num_partitions; i++) { hash_vals_and_value.push_back({std::vector {}, i}); - for (size_t j = 0; j < 2; j++) { + for (size_t j = 0; j < num_blocks; j++) { vectorized::Block in_block; vectorized::DataTypePtr int_type = std::make_shared(); auto int_col0 = vectorized::ColumnInt32::create(); @@ -142,26 +163,8 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) { PrimitiveType::TYPE_INT, cast_set(int_col0->size()), 0, nullptr); in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + EXPECT_EQ(expect_block_bytes, in_block.allocated_bytes()); bool in_eos = false; - auto texpr = - TExprNodeBuilder( - TExprNodeType::SLOT_REF, - TTypeDescBuilder() - .set_types(TTypeNodeBuilder() - .set_type(TTypeNodeType::SCALAR) - .set_scalar_type(TPrimitiveType::INT) - .build()) - .build(), - 0) - .set_slot_ref(TSlotRefBuilder(0, 0).build()) - .build(); - auto slot = doris::vectorized::VSlotRef::create_shared(texpr); - slot->_column_id = 0; - ((vectorized::Crc32HashPartitioner*) - _sink_local_states[i] - ->_partitioner.get()) - ->_partition_expr_ctxs.push_back( - std::make_shared(slot)); EXPECT_EQ(exchanger->sink( _runtime_state.get(), &in_block, in_eos, {_sink_local_states[i]->_compute_hash_value_timer, @@ -170,6 +173,7 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) { _sink_local_states[i]->_partitioner.get(), _sink_local_states[i].get(), &shuffle_idx_to_instance_idx}), Status::OK()); + EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), i < num_partitions - 1); EXPECT_EQ(_sink_local_states[i]->_channel_id, i); } } @@ -201,6 +205,51 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) { } EXPECT_EQ(shared_state->mem_usage, 0); } + { + // Add new block and source dependency will be ready again. + for (size_t i = 0; i < num_partitions; i++) { + EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), true); + vectorized::Block in_block; + vectorized::DataTypePtr int_type = std::make_shared(); + auto int_col0 = vectorized::ColumnInt32::create(); + int_col0->insert_many_vals(hash_vals_and_value[i].second, 10); + + auto pre_size = hash_vals_and_value[i].first.size(); + hash_vals_and_value[i].first.resize(pre_size + 10); + std::fill(hash_vals_and_value[i].first.begin() + pre_size, + hash_vals_and_value[i].first.end(), 0); + int_col0->update_crcs_with_value(hash_vals_and_value[i].first.data() + pre_size, + PrimitiveType::TYPE_INT, + cast_set(int_col0->size()), 0, nullptr); + EXPECT_EQ(hash_vals_and_value[i].first.front(), hash_vals_and_value[i].first.back()); + in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + EXPECT_EQ(expect_block_bytes, in_block.allocated_bytes()); + bool in_eos = false; + EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos, + {_sink_local_states[i]->_compute_hash_value_timer, + _sink_local_states[i]->_distribute_timer, nullptr}, + {&_sink_local_states[i]->_channel_id, + _sink_local_states[i]->_partitioner.get(), + _sink_local_states[i].get(), &shuffle_idx_to_instance_idx}), + Status::OK()); + EXPECT_EQ(_sink_local_states[i]->_channel_id, i); + } + for (const auto& it : hash_vals_and_value) { + bool eos = false; + auto channel_id = it.first.back() % num_partitions; + EXPECT_EQ(_local_states[channel_id]->_dependency->ready(), true); + vectorized::Block block; + EXPECT_EQ(exchanger->get_block( + _runtime_state.get(), &block, &eos, + {nullptr, nullptr, _local_states[channel_id]->_copy_data_timer}, + {cast_set(_local_states[channel_id]->_channel_id), + _local_states[channel_id].get()}), + Status::OK()); + EXPECT_EQ(block.rows(), 10); + EXPECT_EQ(eos, false); + EXPECT_EQ(_local_states[channel_id]->_dependency->ready(), false); + } + } for (size_t i = 0; i < num_sources; i++) { EXPECT_EQ(exchanger->_data_queue[i].eos, false); EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0); @@ -250,23 +299,6 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) { cast_set(int_col0->size()), 0, nullptr); in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); bool in_eos = false; - auto texpr = TExprNodeBuilder( - TExprNodeType::SLOT_REF, - TTypeDescBuilder() - .set_types(TTypeNodeBuilder() - .set_type(TTypeNodeType::SCALAR) - .set_scalar_type(TPrimitiveType::INT) - .build()) - .build(), - 0) - .set_slot_ref(TSlotRefBuilder(0, 0).build()) - .build(); - auto slot = doris::vectorized::VSlotRef::create_shared(texpr); - slot->_column_id = 0; - ((vectorized::Crc32HashPartitioner*)_sink_local_states[i] - ->_partitioner.get()) - ->_partition_expr_ctxs.push_back( - std::make_shared(slot)); EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos, {_sink_local_states[i]->_compute_hash_value_timer, _sink_local_states[i]->_distribute_timer, nullptr},