Skip to content

Commit

Permalink
fix: avoid call finalizeStage more times than once.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Dec 31, 2024
1 parent 9596ea4 commit 04ce4e7
Showing 1 changed file with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ import pekko.stream.stage._
logics(i).handlers.length
}

// Marks for if a stage has been finalized (finalizeStage been called) or not
private[this] val finalizedMark = Array.fill(logics.length)(false)

private[this] var _subFusingMaterializer: Materializer = _
def subFusingMaterializer: Materializer = _subFusingMaterializer

Expand Down Expand Up @@ -590,14 +593,21 @@ import pekko.stream.stage._
}

def afterStageHasRun(logic: GraphStageLogic): Unit =
if (isStageCompleted(logic)) {
if (isStageCompleted(logic) && !isStageFinalized(logic)) {
markStageFinalized(logic)
runningStages -= 1
finalizeStage(logic)
}

// Returns true if the given stage is already completed
def isStageCompleted(stage: GraphStageLogic): Boolean = stage != null && shutdownCounter(stage.stageId) == 0

// Returns true if the given stage is already finalized
private def isStageFinalized(stage: GraphStageLogic): Boolean = finalizedMark(stage.stageId)

// Mark the given stage as finalized
private def markStageFinalized(stage: GraphStageLogic): Unit = finalizedMark(stage.stageId) = true

// Register that a connection in which the given stage participated has been completed and therefore the stage
// itself might stop, too.
private def completeConnection(stageId: Int): Unit = {
Expand Down

0 comments on commit 04ce4e7

Please sign in to comment.