diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index f4094ef440ffef..5fef018423df25 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -106,16 +106,6 @@ std::string RuntimeFilterDependency::debug_string(int indentation_level) { return fmt::to_string(debug_string_buffer); } -Dependency* RuntimeFilterDependency::is_blocked_by(PipelineTask* task) { - std::unique_lock lc(_task_lock); - auto ready = _ready.load(); - if (!ready && task) { - _add_block_task(task); - task->_blocked_dep = this; - } - return ready ? nullptr : this; -} - void RuntimeFilterTimer::call_timeout() { _parent->set_ready(); } diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 9441b8ce09c004..7035ed681a61f9 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -276,8 +276,6 @@ class RuntimeFilterDependency final : public Dependency { : Dependency(id, node_id, name), _runtime_filter(runtime_filter) {} std::string debug_string(int indentation_level = 0) override; - Dependency* is_blocked_by(PipelineTask* task) override; - private: const IRuntimeFilter* _runtime_filter = nullptr; }; diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 98e52ec5271613..b969186b178bf7 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -44,8 +44,7 @@ class Pipeline : public std::enable_shared_from_this { friend class PipelineFragmentContext; public: - explicit Pipeline(PipelineId pipeline_id, int num_tasks, - std::weak_ptr context, int num_tasks_of_parent) + explicit Pipeline(PipelineId pipeline_id, int num_tasks, int num_tasks_of_parent) : _pipeline_id(pipeline_id), _num_tasks(num_tasks), _num_tasks_of_parent(num_tasks_of_parent) { @@ -86,7 +85,9 @@ class Pipeline : public std::enable_shared_from_this { std::vector>& children() { return _children; } void set_children(std::shared_ptr child) { _children.push_back(child); } - void set_children(std::vector> children) { _children = children; } + void set_children(std::vector> children) { + _children = std::move(children); + } void incr_created_tasks(int i, PipelineTask* task) { _num_tasks_created++; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index bd45016adf51e6..76d9c347c387fa 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -215,7 +215,6 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { PipelineId id = _next_pipeline_id++; auto pipeline = std::make_shared( id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances, - std::dynamic_pointer_cast(shared_from_this()), parent ? parent->num_tasks() : _num_instances); if (idx >= 0) { _pipelines.insert(_pipelines.begin() + idx, pipeline); diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 6fc6ad8d6f2e48..4caceca20d4a44 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -36,17 +36,14 @@ namespace doris { class ExecEnv; class ThreadPool; - -namespace pipeline { -class TaskQueue; -} // namespace pipeline } // namespace doris namespace doris::pipeline { +class TaskQueue; class TaskScheduler { public: - TaskScheduler(ExecEnv* exec_env, std::shared_ptr task_queue, std::string name, + TaskScheduler(std::shared_ptr task_queue, std::string name, CgroupCpuCtl* cgroup_cpu_ctl) : _task_queue(std::move(task_queue)), _shutdown(false), diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index ff6205bf55e5d0..1c6f6e5cf2cae3 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -383,7 +383,7 @@ Status ExecEnv::init_pipeline_task_scheduler() { // TODO pipeline workload group combie two blocked schedulers. auto t_queue = std::make_shared(executors_size); _without_group_task_scheduler = - new pipeline::TaskScheduler(this, t_queue, "PipeNoGSchePool", nullptr); + new pipeline::TaskScheduler(t_queue, "PipeNoGSchePool", nullptr); RETURN_IF_ERROR(_without_group_task_scheduler->start()); _runtime_filter_timer_queue = new doris::pipeline::RuntimeFilterTimerQueue(); diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 0488e9ec83c6c2..84016132da9b5a 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -470,8 +470,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e } auto task_queue = std::make_shared(executors_size); std::unique_ptr pipeline_task_scheduler = - std::make_unique(exec_env, std::move(task_queue), - "Pipe_" + tg_name, cg_cpu_ctl_ptr); + std::make_unique(std::move(task_queue), "Pipe_" + tg_name, + cg_cpu_ctl_ptr); Status ret = pipeline_task_scheduler->start(); if (ret.ok()) { _task_sched = std::move(pipeline_task_scheduler);