Skip to content

Commit

Permalink
[Refactor](scan) refactor scan scheduler to improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee committed Dec 4, 2023
1 parent 86c2b93 commit 99f925f
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 56 deletions.
3 changes: 3 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ class RuntimeState {
const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
int batch_size() const { return _query_options.batch_size; }
int wait_full_block_schedule_times() const {
return _query_options.wait_full_block_schedule_times;
}
bool abort_on_error() const { return _query_options.abort_on_error; }
bool abort_on_default_limit_exceeded() const {
return _query_options.abort_on_default_limit_exceeded;
Expand Down
22 changes: 22 additions & 0 deletions be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class PipScannerContext : public vectorized::ScannerContext {
}
}

std::vector<vectorized::BlockUPtr> merge_blocks;
{
std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
if (_blocks_queues[id].empty()) {
Expand All @@ -81,9 +82,30 @@ class PipScannerContext : public vectorized::ScannerContext {
if (_dependency) {
_dependency->block();
}
} else {
auto rows = (*block)->rows();
do {
const auto add_rows = (*_blocks_queues[id].front()).rows();
if (rows + add_rows < state->batch_size()) {
rows += add_rows;
merge_blocks.emplace_back(std::move(_blocks_queues[id].front()));
_blocks_queues[id].pop_front();
} else {
break;
}
} while (!_blocks_queues[id].empty());
}
}

_current_used_bytes -= (*block)->allocated_bytes();
if (!merge_blocks.empty()) {
vectorized::MutableBlock m(block->get());
for (auto& merge_block : merge_blocks) {
_current_used_bytes -= merge_block->allocated_bytes();
static_cast<void>(m.merge(*merge_block));
return_free_block(std::move(merge_block));
}
}

return Status::OK();
}
Expand Down
125 changes: 75 additions & 50 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,65 +209,90 @@ bool ScannerContext::empty_in_queue(int id) {

Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block,
bool* eos, int id, bool wait) {
std::unique_lock l(_transfer_lock);
// Normally, the scanner scheduler will schedule ctx.
// But when the amount of data in the blocks queue exceeds the upper limit,
// the scheduler will stop scheduling.
// (if the scheduler continues to schedule, it will cause a lot of busy running).
// At this point, consumers are required to trigger new scheduling to ensure that
// data can be continuously fetched.
int64_t cur_bytes_in_queue = _cur_bytes_in_queue;
int32_t serving_blocks_num = _serving_blocks_num;
bool to_be_schedule = should_be_scheduled();
int num_running_scanners = _num_running_scanners;

bool is_scheduled = false;
if (to_be_schedule && _num_running_scanners == 0) {
is_scheduled = true;
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
} else {
set_status_on_error(state, false);
std::vector<vectorized::BlockUPtr> merge_blocks;
{
std::unique_lock l(_transfer_lock);
// Normally, the scanner scheduler will schedule ctx.
// But when the amount of data in the blocks queue exceeds the upper limit,
// the scheduler will stop scheduling.
// (if the scheduler continues to schedule, it will cause a lot of busy running).
// At this point, consumers are required to trigger new scheduling to ensure that
// data can be continuously fetched.
int64_t cur_bytes_in_queue = _cur_bytes_in_queue;
int32_t serving_blocks_num = _serving_blocks_num;
bool to_be_schedule = should_be_scheduled();
int num_running_scanners = _num_running_scanners;

bool is_scheduled = false;
if (to_be_schedule && _num_running_scanners == 0) {
is_scheduled = true;
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
} else {
set_status_on_error(state, false);
}
}
}

// Wait for block from queue
if (wait) {
// scanner batch wait time
SCOPED_TIMER(_scanner_wait_batch_timer);
while (!(!_blocks_queue.empty() || _is_finished || !status().ok() ||
state->is_cancelled())) {
if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) {
LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue
<< ", serving_blocks_num " << serving_blocks_num
<< ", num_running_scanners " << num_running_scanners
<< ", to_be_scheudle " << to_be_schedule << (void*)this;
// Wait for block from queue
if (wait) {
// scanner batch wait time
SCOPED_TIMER(_scanner_wait_batch_timer);
while (!(!_blocks_queue.empty() || _is_finished || !status().ok() ||
state->is_cancelled())) {
if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) {
LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue
<< ", serving_blocks_num " << serving_blocks_num
<< ", num_running_scanners " << num_running_scanners
<< ", to_be_scheudle " << to_be_schedule << (void*)this;
}
_blocks_queue_added_cv.wait_for(l, 1s);
}
_blocks_queue_added_cv.wait_for(l, 1s);
}
}

if (state->is_cancelled()) {
set_status_on_error(Status::Cancelled("cancelled"), false);
}

if (!status().ok()) {
return status();
}
if (state->is_cancelled()) {
set_status_on_error(Status::Cancelled("cancelled"), false);
}

if (!_blocks_queue.empty()) {
*block = std::move(_blocks_queue.front());
_blocks_queue.pop_front();
if (!status().ok()) {
return status();
}

auto block_bytes = (*block)->allocated_bytes();
_cur_bytes_in_queue -= block_bytes;
if (!_blocks_queue.empty()) {
*block = std::move(_blocks_queue.front());
_blocks_queue.pop_front();
auto block_bytes = (*block)->allocated_bytes();
_cur_bytes_in_queue -= block_bytes;
_queued_blocks_memory_usage->add(-block_bytes);

auto rows = (*block)->rows();
while (!_blocks_queue.empty()) {
auto& add_block = _blocks_queue.front();
const auto add_rows = (*add_block).rows();
if (rows + add_rows < state->batch_size()) {
rows += add_rows;
block_bytes = (*add_block).allocated_bytes();
_cur_bytes_in_queue -= block_bytes;
_queued_blocks_memory_usage->add(-block_bytes);
merge_blocks.emplace_back(std::move(add_block));
_blocks_queue.pop_front();
} else {
break;
}
}
} else {
*eos = _is_finished;
}
}

_queued_blocks_memory_usage->add(-block_bytes);
return Status::OK();
} else {
*eos = _is_finished;
if (!merge_blocks.empty()) {
vectorized::MutableBlock m(block->get());
for (auto& merge_block : merge_blocks) {
static_cast<void>(m.merge(*merge_block));
return_free_block(std::move(merge_block));
}
}

return Status::OK();
}

Expand Down
19 changes: 13 additions & 6 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,18 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
bool should_stop = false;
// Has to wait at least one full block, or it will cause a lot of schedule task in priority
// queue, it will affect query latency and query concurrency for example ssb 3.3.
while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold &&
num_rows_in_block < state->batch_size()) {
auto should_do_scan = [&, batch_size = state->batch_size(),
time = state->wait_full_block_schedule_times()]() {
if (raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold) {
return true;
} else if (num_rows_in_block < batch_size) {
return raw_bytes_read < raw_bytes_threshold * time &&
raw_rows_read < raw_rows_threshold * time;
}
return false;
};

while (!eos && should_do_scan()) {
// TODO llj task group should should_yield?
if (UNLIKELY(ctx->done())) {
// No need to set status on error here.
Expand Down Expand Up @@ -384,10 +394,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
ctx->return_free_block(std::move(block));
} else {
if (!blocks.empty() && blocks.back()->rows() + block->rows() <= state->batch_size()) {
status = vectorized::MutableBlock(blocks.back().get()).merge(*block);
if (!status.ok()) {
break;
}
static_cast<void>(vectorized::MutableBlock(blocks.back().get()).merge(*block));
ctx->return_free_block(std::move(block));
} else {
blocks.push_back(std::move(block));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ private void initQueryOptions(ConnectContext context) {
this.queryOptions.setExecutionTimeout(context.getExecTimeout());
this.queryOptions.setEnableScanNodeRunSerial(context.getSessionVariable().isEnableScanRunSerial());
this.queryOptions.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
this.queryOptions.setWaitFullBlockScheduleTimes(context.getSessionVariable().getWaitFullBlockScheduleTimes());
}

public ConnectContext getConnectContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ public class SessionVariable implements Serializable, Writable {
// this session variable is set to true.
public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = "fallback_other_replica_when_fixed_corrupt";

public static final String WAIT_FULL_BLOCK_SCHEDULE_TIMES = "wait_full_block_schedule_times";

public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
Expand Down Expand Up @@ -832,6 +834,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = USE_RF_DEFAULT)
public boolean useRuntimeFilterDefaultSize = false;

@VariableMgr.VarAttr(name = WAIT_FULL_BLOCK_SCHEDULE_TIMES)
public int waitFullBlockScheduleTimes = 2;

public int getBeNumberForTest() {
return beNumberForTest;
}
Expand Down Expand Up @@ -2159,6 +2164,10 @@ public String getSqlDialect() {
return sqlDialect;
}

public int getWaitFullBlockScheduleTimes() {
return waitFullBlockScheduleTimes;
}

public ParseDialect.Dialect getSqlParseDialect() {
return ParseDialect.Dialect.getByName(sqlDialect);
}
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ struct TQueryOptions {
90: optional bool skip_missing_version = false;

91: optional bool runtime_filter_wait_infinitely = false;

92: optional i32 wait_full_block_schedule_times = 1;
}


Expand Down

0 comments on commit 99f925f

Please sign in to comment.