Skip to content

Commit

Permalink
[Refactor](exec) change some unreasonable code in dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee committed Oct 21, 2024
1 parent 4ec6414 commit 5060a32
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 10 deletions.
7 changes: 4 additions & 3 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1283,16 +1283,17 @@ void IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTim
_filter_timer.push_back(timer);
}

void IRuntimeFilter::set_dependency(std::shared_ptr<pipeline::Dependency> dependency) {
void IRuntimeFilter::set_finish_dependency(
const std::shared_ptr<pipeline::CountedFinishDependency>& dependency) {
_dependency = dependency;
((pipeline::CountedFinishDependency*)_dependency.get())->add();
_dependency->add();
CHECK(_dependency);
}

void IRuntimeFilter::set_synced_size(uint64_t global_size) {
_synced_size = global_size;
if (_dependency) {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
_dependency->sub();
}
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct RuntimeFilterContextSPtr;

namespace pipeline {
class RuntimeFilterTimer;
class CountedFinishDependency;
} // namespace pipeline

enum class RuntimeFilterType {
Expand Down Expand Up @@ -352,7 +353,8 @@ class IRuntimeFilter {

void set_synced_size(uint64_t global_size);

void set_dependency(std::shared_ptr<pipeline::Dependency> dependency);
void set_finish_dependency(
const std::shared_ptr<pipeline::CountedFinishDependency>& dependency);

int64_t get_synced_size() const { return _synced_size; }

Expand Down Expand Up @@ -422,7 +424,7 @@ class IRuntimeFilter {
std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;

int64_t _synced_size = -1;
std::shared_ptr<pipeline::Dependency> _dependency;
std::shared_ptr<pipeline::CountedFinishDependency> _dependency;
};

// avoid expose RuntimePredicateWrapper
Expand Down
6 changes: 3 additions & 3 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ class VRuntimeFilterSlots {
}

Status send_filter_size(RuntimeState* state, uint64_t hash_table_size,
std::shared_ptr<pipeline::Dependency> dependency) {
std::shared_ptr<pipeline::CountedFinishDependency> dependency) {
if (_runtime_filters.empty()) {
return Status::OK();
}
for (auto runtime_filter : _runtime_filters) {
if (runtime_filter->need_sync_filter_size()) {
runtime_filter->set_dependency(dependency);
runtime_filter->set_finish_dependency(dependency);
}
}

// send_filter_size may call dependency->sub(), so we call set_dependency firstly for all rf to avoid dependency set_ready repeatedly
// send_filter_size may call dependency->sub(), so we call set_finish_dependency firstly for all rf to avoid dependency set_ready repeatedly
for (auto runtime_filter : _runtime_filters) {
if (runtime_filter->need_sync_filter_size()) {
RETURN_IF_ERROR(runtime_filter->send_filter_size(state, hash_table_size));
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ struct FakeSharedState final : public BasicSharedState {
ENABLE_FACTORY_CREATOR(FakeSharedState)
};

struct CountedFinishDependency final : public Dependency {
class CountedFinishDependency final : public Dependency {
public:
using SharedState = FakeSharedState;
CountedFinishDependency(int id, int node_id, std::string name)
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class HashJoinBuildSinkLocalState final
*/
bool _build_side_ignore_null = false;
std::vector<int> _build_col_ids;
std::shared_ptr<Dependency> _finish_dependency;
std::shared_ptr<CountedFinishDependency> _finish_dependency;

RuntimeProfile::Counter* _build_table_timer = nullptr;
RuntimeProfile::Counter* _build_expr_call_timer = nullptr;
Expand Down

0 comments on commit 5060a32

Please sign in to comment.