Skip to content

Commit

Permalink
[Improvement](queue) Return value of concurrent queue should be proce… (
Browse files Browse the repository at this point in the history
apache#45032)

…… (apache#44986)

…ssed

Push items into concurrent queue will return false due to some
unexpected error (e.g. poor memory available).
  • Loading branch information
Gabriel39 authored Dec 5, 2024
1 parent 11c517f commit 405b50b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
14 changes: 12 additions & 2 deletions be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,23 @@ struct BlockQueue {
: eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
inline bool enqueue(BlockType const& item) {
if (!eos) {
data_queue.enqueue(item);
if (!data_queue.enqueue(item)) [[unlikely]] {
throw Exception(ErrorCode::INTERNAL_ERROR,
"Exception occurs in data queue [size = {}] of local exchange.",
data_queue.size_approx());
}
return true;
}
return false;
}

inline bool enqueue(BlockType&& item) {
if (!eos) {
data_queue.enqueue(std::move(item));
if (!data_queue.enqueue(std::move(item))) [[unlikely]] {
throw Exception(ErrorCode::INTERNAL_ERROR,
"Exception occurs in data queue [size = {}] of local exchange.",
data_queue.size_approx());
}
return true;
}
return false;
Expand Down Expand Up @@ -146,6 +154,8 @@ struct ShuffleBlockWrapper {
shared_state->exchanger->_free_block_limit *
shared_state->exchanger->_num_sources) {
data_block.clear_column_data();
// Free blocks is used to improve memory efficiency. Failure during pushing back
// free block will not incur any bad result so just ignore the return value.
shared_state->exchanger->_free_blocks.enqueue(std::move(data_block));
}
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
_scanners.enqueue_bulk(scanners.begin(), scanners.size());
if (!_scanners.enqueue_bulk(scanners.begin(), scanners.size())) [[unlikely]] {
throw Exception(ErrorCode::INTERNAL_ERROR,
"Exception occurs during scanners initialization.");
};
if (limit < 0) {
limit = -1;
}
Expand Down

0 comments on commit 405b50b

Please sign in to comment.