Skip to content

Commit

Permalink
[feat](spill) spill and reserve (apache#47462)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Problem Summary:

A brand-new spilling triggering strategy:
1. Use workload groups to control/manage the memory usage of queries. 
2. Trigger spilling when the memory reservation attempt fails.
  • Loading branch information
mrhhsg authored Feb 25, 2025
1 parent 1f0b0db commit 34756a4
Show file tree
Hide file tree
Showing 459 changed files with 7,206 additions and 2,536 deletions.
18 changes: 11 additions & 7 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "agent/workload_group_listener.h"

#include <thrift/protocol/TDebugProtocol.h>

#include "runtime/exec_env.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
Expand All @@ -33,6 +35,8 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
if (!topic_info.__isset.workload_group_info) {
continue;
}
VLOG_DEBUG << "Received publish workload group info request: "
<< apache::thrift::ThriftDebugString(topic_info).c_str();
is_set_workload_group_info = true;

// 1 parse topic info to group info
Expand Down Expand Up @@ -65,13 +69,13 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
// 5 upsert io throttle
wg->upsert_scan_io_throttle(&workload_group_info);

LOG(INFO) << "[topic_publish_wg]update workload group finish, wg info="
<< wg->debug_string() << ", enable_cpu_hard_limit="
<< (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false")
<< ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" << workload_group_info.cgroup_cpu_hard_limit
<< ", cgroup home path=" << config::doris_cgroup_cpu_path
<< ", list size=" << list_size << ", thread info=" << wg->thread_debug_info();
VLOG_DEBUG << "[topic_publish_wg]update workload group finish, wg info="
<< wg->debug_string() << ", enable_cpu_hard_limit="
<< (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false")
<< ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" << workload_group_info.cgroup_cpu_hard_limit
<< ", cgroup home path=" << config::doris_cgroup_cpu_path
<< ", list size=" << list_size << ", thread info=" << wg->thread_debug_info();
}

// NOTE(wb) when is_set_workload_group_info=false, it means FE send a empty workload group list
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Status CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) {
}

Status CloudDeltaWriter::write(const vectorized::Block* block,
const std::vector<uint32_t>& row_idxs) {
const DorisVector<uint32_t>& row_idxs) {
if (row_idxs.empty()) [[unlikely]] {
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class CloudDeltaWriter final : public BaseDeltaWriter {
const UniqueId& load_id);
~CloudDeltaWriter() override;

Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) override;
Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) override;

Status close() override;

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques
return Status::OK();
}

std::unordered_map<int64_t, std::vector<uint32_t>> tablet_to_rowidxs;
std::unordered_map<int64_t, DorisVector<uint32_t>> tablet_to_rowidxs;
_build_tablet_to_rowidxs(request, &tablet_to_rowidxs);

std::unordered_set<int64_t> partition_ids;
Expand Down
6 changes: 5 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <fmt/core.h>
#include <gflags/gflags.h>
#include <stdint.h>

#include <algorithm>
Expand Down Expand Up @@ -118,7 +119,7 @@ DEFINE_String(mem_limit, "90%");
DEFINE_Double(soft_mem_limit_frac, "0.9");

// Cache capacity reduce mem limit as a fraction of soft mem limit.
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.6");
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.7");

// Schema change memory limit as a fraction of soft memory limit.
DEFINE_Double(schema_change_mem_limit_frac, "0.6");
Expand Down Expand Up @@ -1286,6 +1287,9 @@ DEFINE_Validator(spill_io_thread_pool_thread_num, [](const int config) -> bool {
});
DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");

// paused query in queue timeout(ms) will be resumed or canceled
DEFINE_Int64(spill_in_paused_queue_timeout_ms, "60000");

DEFINE_mBool(check_segment_when_build_rowset_meta, "false");

DEFINE_mInt32(max_s3_client_retry, "10");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,7 @@ DECLARE_mInt32(spill_gc_interval_ms);
DECLARE_mInt32(spill_gc_work_time_ms);
DECLARE_Int32(spill_io_thread_pool_thread_num);
DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_Int64(spill_in_paused_queue_timeout_ms);

DECLARE_mBool(check_segment_when_build_rowset_meta);

Expand Down
10 changes: 6 additions & 4 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,8 @@ void Daemon::memory_maintenance_thread() {
doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();

// step 7. Analyze blocking queries.
// TODO sort the operators that can spill, wake up the pipeline task spill
// or continue execution according to certain rules or cancel query.
// step 7: handle paused queries(caused by memory insufficient)
doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries();

// step 8. Flush memtable
doris::GlobalMemoryArbitrator::notify_memtable_memory_refresh();
Expand Down Expand Up @@ -547,7 +546,9 @@ void Daemon::cache_adjust_capacity_thread() {
doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for(
l, std::chrono::milliseconds(100));
}
double adjust_weighted = GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted;
double adjust_weighted = std::min<double>(
GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted,
GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted);
if (_stop_background_threads_latch.count() == 0) {
break;
}
Expand All @@ -567,6 +568,7 @@ void Daemon::cache_adjust_capacity_thread() {
LOG(INFO) << fmt::format(
"[MemoryGC] refresh cache capacity end, free memory {}, details: {}",
PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str());
GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted = adjust_weighted;
} while (true);
}

Expand Down
14 changes: 14 additions & 0 deletions be/src/common/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,18 @@ class TaggableLogger {
#define LOG_ERROR TaggableLogger(__FILE__, __LINE__, google::GLOG_ERROR)
#define LOG_FATAL TaggableLogger(__FILE__, __LINE__, google::GLOG_FATAL)

// Avoid the printed log message is truncated by the glog max log size limit
#define LOG_LONG_STRING(severity, long_log_str) \
do { \
constexpr size_t max_log_size = 30000 - 100; \
size_t pos = 0; \
size_t total_size = long_log_str.size(); \
size_t tmp_size = std::min(max_log_size, total_size); \
while (pos < total_size) { \
tmp_size = std::min(max_log_size, total_size - pos); \
LOG(severity) << std::string(long_log_str.data() + pos, tmp_size); \
pos += tmp_size; \
} \
} while (0)

} // namespace doris
10 changes: 10 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ namespace ErrorCode {
E(BAD_CAST, -254, true); \
E(ARITHMETIC_OVERFLOW_ERRROR, -255, false); \
E(PERMISSION_DENIED, -256, false); \
E(QUERY_MEMORY_EXCEEDED, -257, false); \
E(WORKLOAD_GROUP_MEMORY_EXCEEDED, -258, false); \
E(PROCESS_MEMORY_EXCEEDED, -259, false); \
E(CE_CMD_PARAMS_ERROR, -300, true); \
E(CE_BUFFER_TOO_SMALL, -301, true); \
E(CE_CMD_NOT_VALID, -302, true); \
Expand Down Expand Up @@ -381,6 +384,11 @@ class [[nodiscard]] Status {
_code = rhs._code;
if (rhs._err_msg) {
_err_msg = std::make_unique<ErrMsg>(*rhs._err_msg);
} else {
// If rhs error msg is empty, then should also clear current error msg
// For example, if rhs is OK and current status is error, then copy to current
// status, should clear current error message.
_err_msg.reset();
}
return *this;
}
Expand All @@ -390,6 +398,8 @@ class [[nodiscard]] Status {
_code = rhs._code;
if (rhs._err_msg) {
_err_msg = std::move(rhs._err_msg);
} else {
_err_msg.reset();
}
return *this;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_
// name, type, size
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"FE_HOST", TYPE_VARCHAR, sizeof(StringRef), false},
{"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
{"TASK_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
{"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
Expand All @@ -41,6 +42,8 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_
{"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"SHUFFLE_SEND_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
{"QUERY_TYPE", TYPE_VARCHAR, sizeof(StringRef), false},
{"SPILL_WRITE_BYTES_TO_LOCAL_STORAGE", TYPE_BIGINT, sizeof(int64_t), false},
{"SPILL_READ_BYTES_FROM_LOCAL_STORAGE", TYPE_BIGINT, sizeof(int64_t), false},
};

SchemaBackendActiveTasksScanner::SchemaBackendActiveTasksScanner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendWorkloadGroupResourceUsage::
{"CPU_USAGE_PERCENT", TYPE_DOUBLE, sizeof(double), false},
{"LOCAL_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
{"REMOTE_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
{"WRITE_BUFFER_USAGE_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
};

SchemaBackendWorkloadGroupResourceUsage::SchemaBackendWorkloadGroupResourceUsage()
Expand Down
5 changes: 4 additions & 1 deletion be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ class VRuntimeFilterSlots {

Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
// process IN_OR_BLOOM_FILTER's real type
for (auto filter : _runtime_filters) {
for (auto& filter : _runtime_filters) {
if (filter->get_ignored()) {
continue;
}
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
get_real_size(filter.get(), local_hash_table_size) >
state->runtime_filter_max_in_num()) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Status BaseDeltaWriter::init() {
return Status::OK();
}

Status DeltaWriter::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) {
Status DeltaWriter::write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) {
if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BaseDeltaWriter {

virtual ~BaseDeltaWriter();

virtual Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) = 0;
virtual Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) = 0;

// flush the last memtable to flush queue, must call it before build_rowset()
virtual Status close() = 0;
Expand Down Expand Up @@ -123,7 +123,7 @@ class DeltaWriter final : public BaseDeltaWriter {

~DeltaWriter() override;

Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) override;
Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) override;

Status close() override;

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ Status DeltaWriterV2::init() {
return Status::OK();
}

Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) {
Status DeltaWriterV2::write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) {
if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class DeltaWriterV2 {

Status init();

Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs);
Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs);

// flush the last memtable to flush queue, must call it before close_wait()
Status close();
Expand Down
Loading

0 comments on commit 34756a4

Please sign in to comment.