Skip to content

Commit

Permalink
[fix](pipelineX) Fix unexpected OOM on pipelineX
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Jan 2, 2024
1 parent 90b2ee9 commit b35bcd9
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 8 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class AggSinkDependency final : public Dependency {
~AggSinkDependency() override = default;

void set_ready() override {
if (_is_streaming_agg_state()) {
if (_is_streaming_agg_state() && _shared_state) {
if (((SharedState*)Dependency::_shared_state.get())
->data_queue->has_enough_space_to_push()) {
Dependency::set_ready();
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ struct BasicSharedState {
DependencySPtr source_dep = nullptr;
DependencySPtr sink_dep = nullptr;

virtual Status close(RuntimeState* state) { return Status::OK(); }
virtual ~BasicSharedState() = default;
};

Expand Down Expand Up @@ -90,6 +89,7 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
void set_shared_state(std::shared_ptr<BasicSharedState> shared_state) {
_shared_state = shared_state;
}
void clear_shared_state() { _shared_state.reset(); }
virtual std::string debug_string(int indentation_level = 0);

// Start the watcher. We use it to count how long this dependency block the current pipeline task.
Expand Down
10 changes: 4 additions & 6 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,9 @@ Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
if (_shared_state) {
RETURN_IF_ERROR(_shared_state->close(state));
}
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
_dependency->clear_shared_state();
}
if (_rows_returned_counter != nullptr) {
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
Expand Down Expand Up @@ -439,11 +437,11 @@ Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
if (_closed) {
return Status::OK();
}
if (_shared_state) {
RETURN_IF_ERROR(_shared_state->close(state));
}
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
if constexpr (!std::is_same_v<LocalExchangeSinkDependency, DependencyType>) {
_dependency->clear_shared_state();
}
}
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
Expand Down

0 comments on commit b35bcd9

Please sign in to comment.