Skip to content

Commit

Permalink
remove some unused param
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee committed Oct 31, 2024
1 parent 588a87d commit fa5ccd3
Show file tree
Hide file tree
Showing 7 changed files with 9 additions and 24 deletions.
10 changes: 0 additions & 10 deletions be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,6 @@ std::string RuntimeFilterDependency::debug_string(int indentation_level) {
return fmt::to_string(debug_string_buffer);
}

Dependency* RuntimeFilterDependency::is_blocked_by(PipelineTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load();
if (!ready && task) {
_add_block_task(task);
task->_blocked_dep = this;
}
return ready ? nullptr : this;
}

void RuntimeFilterTimer::call_timeout() {
_parent->set_ready();
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,6 @@ class RuntimeFilterDependency final : public Dependency {
: Dependency(id, node_id, name), _runtime_filter(runtime_filter) {}
std::string debug_string(int indentation_level = 0) override;

Dependency* is_blocked_by(PipelineTask* task) override;

private:
const IRuntimeFilter* _runtime_filter = nullptr;
};
Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
friend class PipelineFragmentContext;

public:
explicit Pipeline(PipelineId pipeline_id, int num_tasks,
std::weak_ptr<PipelineFragmentContext> context, int num_tasks_of_parent)
explicit Pipeline(PipelineId pipeline_id, int num_tasks, int num_tasks_of_parent)
: _pipeline_id(pipeline_id),
_num_tasks(num_tasks),
_num_tasks_of_parent(num_tasks_of_parent) {
Expand Down Expand Up @@ -86,7 +85,9 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {

std::vector<std::shared_ptr<Pipeline>>& children() { return _children; }
void set_children(std::shared_ptr<Pipeline> child) { _children.push_back(child); }
void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; }
void set_children(std::vector<std::shared_ptr<Pipeline>> children) {
_children = std::move(children);
}

void incr_created_tasks(int i, PipelineTask* task) {
_num_tasks_created++;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
PipelineId id = _next_pipeline_id++;
auto pipeline = std::make_shared<Pipeline>(
id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
parent ? parent->num_tasks() : _num_instances);
if (idx >= 0) {
_pipelines.insert(_pipelines.begin() + idx, pipeline);
Expand Down
7 changes: 2 additions & 5 deletions be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,14 @@
namespace doris {
class ExecEnv;
class ThreadPool;

namespace pipeline {
class TaskQueue;
} // namespace pipeline
} // namespace doris

namespace doris::pipeline {
class TaskQueue;

class TaskScheduler {
public:
TaskScheduler(ExecEnv* exec_env, std::shared_ptr<TaskQueue> task_queue, std::string name,
TaskScheduler(std::shared_ptr<TaskQueue> task_queue, std::string name,
CgroupCpuCtl* cgroup_cpu_ctl)
: _task_queue(std::move(task_queue)),
_shutdown(false),
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ Status ExecEnv::init_pipeline_task_scheduler() {
// TODO pipeline workload group combie two blocked schedulers.
auto t_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
_without_group_task_scheduler =
new pipeline::TaskScheduler(this, t_queue, "PipeNoGSchePool", nullptr);
new pipeline::TaskScheduler(t_queue, "PipeNoGSchePool", nullptr);
RETURN_IF_ERROR(_without_group_task_scheduler->start());

_runtime_filter_timer_queue = new doris::pipeline::RuntimeFilterTimerQueue();
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
}
auto task_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
std::make_unique<pipeline::TaskScheduler>(exec_env, std::move(task_queue),
"Pipe_" + tg_name, cg_cpu_ctl_ptr);
std::make_unique<pipeline::TaskScheduler>(std::move(task_queue), "Pipe_" + tg_name,
cg_cpu_ctl_ptr);
Status ret = pipeline_task_scheduler->start();
if (ret.ok()) {
_task_sched = std::move(pipeline_task_scheduler);
Expand Down

0 comments on commit fa5ccd3

Please sign in to comment.