Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: check for pause timeout after errors #2084

Merged
merged 10 commits into from
Oct 2, 2024
36 changes: 22 additions & 14 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,37 +845,45 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin
return updated, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess if for some reason we return from here repeatedly and indefinitely, we will not honor pause timeout. Is that possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here if we are not able to scale down then there is some issue at the K8s end. We should not force a pause in such a scenario

Copy link
Contributor

@juliev0 juliev0 Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I suppose I'm okay with that if that's what the numaflow team prefers. In my mind, I imagined keeping it simple and just saying a pause timeout is honored in all cases, especially since this function and/or the call to scaleDownSourceVertices() could morph over time and potentially include other calls that could fail and I wouldn't want to accidentally get into the same situation then. But maybe I'm being too paranoid.

I see what you're saying about not wanting to risk data loss because of a kubernetes issue...like if there's some K8S outage, then all of the pipelines would have data loss...okay, maybe a good call. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the PR looks good to me in that case. Thanks for doing this!

Copy link
Contributor Author

@kohlisid kohlisid Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that if there are issues arising from K8s itself which are involved while doing scaleDownSourceVertices we have bigger problems at hand for the pipeline and they should get reflected in the status. So like I said we can skip forcefully pausing for this one.

morph over time - This we will take care that the design is maintained :)

I will let @whynowy add any other opinions on this in case he has something else to add.

}

daemonClient, err := daemonclient.NewGRPCDaemonServiceClient(pl.GetDaemonServiceURL())
if err != nil {
return true, err
}
defer func() {
_ = daemonClient.Close()
}()
drainCompleted, err := daemonClient.IsDrained(ctx, pl.Name)
if err != nil {
return true, err
var daemonError error
var drainCompleted = false

// Check for the daemon to obtain the buffer draining information, in case we see an error trying to
// retrieve this we do not exit prematurely to allow honoring the pause timeout for a consistent error
// - In case the timeout has not occurred we would trigger a requeue
// - If the timeout has occurred even after getting the drained error, we will try to pause the pipeline
daemonClient, daemonError := daemonclient.NewGRPCDaemonServiceClient(pl.GetDaemonServiceURL())
if daemonClient != nil {
defer func() {
_ = daemonClient.Close()
}()
drainCompleted, err = daemonClient.IsDrained(ctx, pl.Name)
if err != nil {
daemonError = err
}
}

pauseTimestamp, err := time.Parse(time.RFC3339, pl.GetAnnotations()[dfv1.KeyPauseTimestamp])
if err != nil {
return false, err
}

// if drain is completed, or we have exceeded the pause deadline, mark pl as paused and scale down
if time.Now().After(pauseTimestamp.Add(time.Duration(pl.Spec.Lifecycle.GetPauseGracePeriodSeconds())*time.Second)) || drainCompleted {
_, err := r.scaleDownAllVertices(ctx, pl)
_, err = r.scaleDownAllVertices(ctx, pl)
if err != nil {
return true, err
}
// if the drain completed succesfully, then set the DrainedOnPause field to true
if daemonError != nil {
r.logger.Errorf("error in fetching Drained status, Pausing due to timeout: %v", zap.Error(err))
kohlisid marked this conversation as resolved.
Show resolved Hide resolved
}
// if the drain completed successfully, then set the DrainedOnPause field to true
if drainCompleted {
pl.Status.MarkDrainedOnPauseTrue()
}
pl.Status.MarkPhasePaused()
return false, nil
}
return true, nil
return true, daemonError
}

func (r *pipelineReconciler) scaleDownSourceVertices(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
Expand Down
Loading