Skip to content

Commit

Permalink
chore: add app.kubernetes.io/name label (#1230)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Oct 19, 2023
1 parent 265a026 commit 3a12b59
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 30 deletions.
1 change: 1 addition & 0 deletions pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func (p Pipeline) GetDaemonDeploymentObj(req GetDaemonDeploymentReq) (*appv1.Dep
KeyPartOf: Project,
KeyManagedBy: ControllerPipeline,
KeyComponent: ComponentDaemon,
KeyAppName: p.GetDaemonDeploymentName(),
KeyPipelineName: p.Name,
}
spec := appv1.DeploymentSpec{
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/numaflow/v1alpha1/side_inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (si SideInput) getManagerDeploymentObj(pipeline Pipeline, req GetSideInputD
KeyPartOf: Project,
KeyManagedBy: ControllerPipeline,
KeyComponent: ComponentSideInputManager,
KeyAppName: pipeline.GetSideInputsManagerDeploymentName(si.Name),
KeyPipelineName: pipeline.Name,
KeySideInputName: si.Name,
}
Expand Down
68 changes: 38 additions & 30 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,28 +380,33 @@ func (r *pipelineReconciler) createOrUpdateDaemonDeployment(ctx context.Context,
deployHash := sharedutil.MustHash(deploy.Spec)
deploy.Annotations = map[string]string{dfv1.KeyHash: deployHash}
existingDeploy := &appv1.Deployment{}
needToCreate := false
if err := r.client.Get(ctx, types.NamespacedName{Namespace: pl.Namespace, Name: deploy.Name}, existingDeploy); err != nil {
if apierrors.IsNotFound(err) {
if err := r.client.Create(ctx, deploy); err != nil && !apierrors.IsAlreadyExists(err) {
log.Errorw("Failed to create a daemon deployment", zap.String("deployment", deploy.Name), zap.Error(err))
pl.Status.MarkDeployFailed("CreateDaemonDeployFailed", err.Error())
return fmt.Errorf("failed to create a daemon deployment, %w", err)
}
log.Infow("Succeeded to create a daemon deployment", zap.String("deployment", deploy.Name))
} else {
if !apierrors.IsNotFound(err) {
log.Errorw("Failed to find existing daemon deployment", zap.String("deployment", deploy.Name), zap.Error(err))
pl.Status.MarkDeployFailed("FindDaemonDeployFailed", err.Error())
return fmt.Errorf("failed to find existing daemon deployment, %w", err)
} else {
needToCreate = true
}
} else {
if existingDeploy.GetAnnotations()[dfv1.KeyHash] != deployHash {
// Delete and recreate, to avoid updating immutable fields problem.
if err := r.client.Delete(ctx, existingDeploy); err != nil {
log.Errorw("Failed to delete the outdated daemon deployment", zap.String("deployment", existingDeploy.Name), zap.Error(err))
pl.Status.MarkDeployFailed("DeleteOldDaemonDeployFailed", err.Error())
return fmt.Errorf("failed to delete an outdated daemon deployment, %w", err)
}
needToCreate = true
}
} else if existingDeploy.GetAnnotations()[dfv1.KeyHash] != deployHash {
existingDeploy.Annotations[dfv1.KeyHash] = deployHash
existingDeploy.Spec = deploy.Spec
if err := r.client.Update(ctx, existingDeploy); err != nil {
log.Errorw("Failed to update a daemon deployment", zap.String("deployment", existingDeploy.Name), zap.Error(err))
pl.Status.MarkDeployFailed("UpdateDaemonDeployFailed", err.Error())
return fmt.Errorf("failed to update a daemon deployment, %w", err)
}
if needToCreate {
if err := r.client.Create(ctx, deploy); err != nil && !apierrors.IsAlreadyExists(err) {
log.Errorw("Failed to create a daemon deployment", zap.String("deployment", deploy.Name), zap.Error(err))
pl.Status.MarkDeployFailed("CreateDaemonDeployFailed", err.Error())
return fmt.Errorf("failed to create a daemon deployment, %w", err)
}
log.Infow("Succeeded to update daemon deployment", zap.String("deployment", existingDeploy.Name))
log.Infow("Succeeded to created/recreatd a daemon deployment", zap.String("deployment", deploy.Name))
}
return nil
}
Expand Down Expand Up @@ -447,27 +452,30 @@ func (r *pipelineReconciler) createOrUpdateSIMDeployments(ctx context.Context, p
newObj.Annotations = make(map[string]string)
}
newObj.Annotations[dfv1.KeyHash] = deployHash
if oldObj, existing := existingObjs[newObj.Name]; !existing {
needToCreate := false
if oldObj, existing := existingObjs[newObj.Name]; existing {
if oldObj.GetAnnotations()[dfv1.KeyHash] != newObj.GetAnnotations()[dfv1.KeyHash] {
// Delete and recreate, to avoid updating immutable fields problem.
if err := r.client.Delete(ctx, &oldObj); err != nil {
pl.Status.MarkDeployFailed("DeleteOldSIMDeploymentFailed", err.Error())
return fmt.Errorf("failed to delete old Side Inputs Manager Deployment %q, %w", oldObj.Name, err)
}
needToCreate = true
}
delete(existingObjs, oldObj.Name)
} else {
needToCreate = true
}
if needToCreate {
if err := r.client.Create(ctx, newObj); err != nil {
if apierrors.IsAlreadyExists(err) { // probably somebody else already created it
continue
} else {
pl.Status.MarkDeployFailed("CreateSIMDeploymentFailed", err.Error())
return fmt.Errorf("failed to create Side Inputs Manager Deployment %q, %w", newObj.Name, err)
return fmt.Errorf("failed to create/recreate Side Inputs Manager Deployment %q, %w", newObj.Name, err)
}
}
log.Infow("Created Side Inputs Manager Deployment successfully", zap.String("deployment", newObj.Name))
} else {
if oldObj.GetAnnotations()[dfv1.KeyHash] != newObj.GetAnnotations()[dfv1.KeyHash] { // need to update
oldObj.Spec = newObj.Spec
oldObj.Annotations[dfv1.KeyHash] = newObj.GetAnnotations()[dfv1.KeyHash]
if err := r.client.Update(ctx, &oldObj); err != nil {
pl.Status.MarkDeployFailed("UpdateSIMDeploymentFailed", err.Error())
return fmt.Errorf("failed to update Side Inputs Manager Deployment %q, %w", oldObj.Name, err)
}
log.Infow("Updated Side Inputs Manager Deployment successfully", zap.String("deployment", oldObj.Name))
}
delete(existingObjs, oldObj.Name)
log.Infow("Succeeded to create/recreate Side Inputs Manager Deployment", zap.String("deployment", newObj.Name))
}
}
for _, v := range existingObjs {
Expand Down
1 change: 1 addition & 0 deletions pkg/reconciler/vertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
labels[dfv1.KeyPartOf] = dfv1.Project
labels[dfv1.KeyManagedBy] = dfv1.ControllerVertex
labels[dfv1.KeyComponent] = dfv1.ComponentVertex
labels[dfv1.KeyAppName] = vertex.Name
labels[dfv1.KeyPipelineName] = vertex.Spec.PipelineName
labels[dfv1.KeyVertexName] = vertex.Spec.Name
annotations[dfv1.KeyHash] = hash
Expand Down

0 comments on commit 3a12b59

Please sign in to comment.