Skip to content

Commit

Permalink
feat: Add ObservedGeneration field in vertex status and use it for ca…
Browse files Browse the repository at this point in the history
…lculating status (#1892)

Signed-off-by: chandankumar4 <[email protected]>
  • Loading branch information
chandankumar4 authored Aug 2, 2024
1 parent 9252c83 commit bc1451a
Show file tree
Hide file tree
Showing 17 changed files with 227 additions and 106 deletions.
5 changes: 5 additions & 0 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -20187,6 +20187,11 @@
"message": {
"type": "string"
},
"observedGeneration": {
"description": "ObservedGeneration stores the generation value observed by the controller.",
"format": "int64",
"type": "integer"
},
"phase": {
"type": "string"
},
Expand Down
5 changes: 5 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -20169,6 +20169,11 @@
"message": {
"type": "string"
},
"observedGeneration": {
"description": "ObservedGeneration stores the generation value observed by the controller.",
"type": "integer",
"format": "int64"
},
"phase": {
"type": "string"
},
Expand Down
3 changes: 3 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5449,6 +5449,9 @@ spec:
type: string
message:
type: string
observedGeneration:
format: int64
type: integer
phase:
enum:
- ""
Expand Down
3 changes: 3 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17837,6 +17837,9 @@ spec:
type: string
message:
type: string
observedGeneration:
format: int64
type: integer
phase:
enum:
- ""
Expand Down
3 changes: 3 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17837,6 +17837,9 @@ spec:
type: string
message:
type: string
observedGeneration:
format: int64
type: integer
phase:
enum:
- ""
Expand Down
19 changes: 19 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -9973,6 +9973,25 @@ Kubernetes meta/v1.Time </a> </em>

</tr>

<tr>

<td>

<code>observedGeneration</code></br> <em> int64 </em>
</td>

<td>

<p>

ObservedGeneration stores the generation value observed by the
controller.
</p>

</td>

</tr>

</tbody>

</table>
Expand Down
174 changes: 99 additions & 75 deletions pkg/apis/numaflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/apis/numaflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,8 @@ type VertexStatus struct {
Selector string `json:"selector,omitempty" protobuf:"bytes,5,opt,name=selector"`
LastScaledAt metav1.Time `json:"lastScaledAt,omitempty" protobuf:"bytes,4,opt,name=lastScaledAt"`
Status `json:",inline" protobuf:"bytes,7,opt,name=status"`
// ObservedGeneration stores the generation value observed by the controller.
ObservedGeneration int64 `json:"observedGeneration,omitempty" protobuf:"varint,8,opt,name=observedGeneration"`
}

func (vs *VertexStatus) MarkPhase(phase VertexPhase, reason, message string) {
Expand Down Expand Up @@ -868,6 +870,11 @@ func (vs *VertexStatus) IsHealthy() bool {
return vs.IsReady()
}

// SetObservedGeneration sets the Status ObservedGeneration
func (vs *VertexStatus) SetObservedGeneration(value int64) {
vs.ObservedGeneration = value
}

// +kubebuilder:object:root=true
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type VertexList struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/reconciler/isbsvc/installer/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,10 +569,10 @@ func (r *jetStreamInstaller) CheckChildrenResourceStatus(ctx context.Context) er
return err
}
// calculate the status of the InterStepBufferService by statefulset status and update the status of isbSvc
if msg, status := getStatefulSetStatus(&isbStatefulSet); status {
r.isbSvc.Status.MarkChildrenResourceHealthy("RolloutFinished", msg)
if msg, reason, status := getStatefulSetStatus(&isbStatefulSet); status {
r.isbSvc.Status.MarkChildrenResourceHealthy(reason, msg)
} else {
r.isbSvc.Status.MarkChildrenResourceNotHealthy("Progressing", msg)
r.isbSvc.Status.MarkChildrenResourceNotHealthy(reason, msg)
}
return nil
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/reconciler/isbsvc/installer/native_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,13 +627,19 @@ func (r *redisInstaller) CheckChildrenResourceStatus(ctx context.Context) error
Namespace: r.isbSvc.Namespace,
Name: generateRedisStatefulSetName(r.isbSvc),
}, &isbStatefulSet); err != nil {
if apierrors.IsNotFound(err) {
r.isbSvc.Status.MarkChildrenResourceNotHealthy("GetStatefulSetFailed",
"StatefulSet not found, might be still under creation")
return nil
}
r.isbSvc.Status.MarkChildrenResourceNotHealthy("GetStatefulSetFailed", err.Error())
return err
}
// calculate the status of the InterStepBufferService by statefulset status and update the status of isbSvc
if msg, status := getStatefulSetStatus(&isbStatefulSet); status {
r.isbSvc.Status.MarkChildrenResourceHealthy("RolloutFinished", msg)
if msg, reason, status := getStatefulSetStatus(&isbStatefulSet); status {
r.isbSvc.Status.MarkChildrenResourceHealthy(reason, msg)
} else {
r.isbSvc.Status.MarkChildrenResourceNotHealthy("Progressing", msg)
r.isbSvc.Status.MarkChildrenResourceNotHealthy(reason, msg)
}
return nil
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/reconciler/isbsvc/installer/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,29 @@ import (

// getStatefulSetStatus returns a message describing statefulset status, and a bool value indicating if the status is considered done.
// Borrowed at kubernetes/kubectl/rollout_status.go https://github.com/kubernetes/kubernetes/blob/cea1d4e20b4a7886d8ff65f34c6d4f95efcb4742/staging/src/k8s.io/kubectl/pkg/polymorphichelpers/rollout_status.go#L130
func getStatefulSetStatus(sts *appv1.StatefulSet) (string, bool) {
func getStatefulSetStatus(sts *appv1.StatefulSet) (string, string, bool) {
if sts.Status.ObservedGeneration == 0 || sts.Generation > sts.Status.ObservedGeneration {
return "Waiting for statefulset spec update to be observed...\n", false
return "Waiting for statefulset spec update to be observed...\n", "Progressing", false
}
if sts.Status.UpdateRevision != sts.Status.CurrentRevision {
return fmt.Sprintf("waiting for statefulset rolling update to complete %d pods at revision %s...\n",
sts.Status.UpdatedReplicas, sts.Status.UpdateRevision), "Progressing", false
}
if sts.Spec.Replicas != nil && sts.Status.ReadyReplicas < *sts.Spec.Replicas {
return fmt.Sprintf("Waiting for %d pods to be ready...\n", *sts.Spec.Replicas-sts.Status.ReadyReplicas), false
return fmt.Sprintf("Waiting for %d pods to be ready...\n", *sts.Spec.Replicas-sts.Status.ReadyReplicas), "Unavailable", false
}
if sts.Spec.UpdateStrategy.Type == appv1.RollingUpdateStatefulSetStrategyType && sts.Spec.UpdateStrategy.RollingUpdate != nil {
if sts.Spec.Replicas != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
if sts.Status.UpdatedReplicas < (*sts.Spec.Replicas - *sts.Spec.UpdateStrategy.RollingUpdate.Partition) {
return fmt.Sprintf(
"Waiting for partitioned roll out to finish: %d out of %d new pods have been updated...\n",
sts.Status.UpdatedReplicas, *sts.Spec.Replicas-*sts.Spec.UpdateStrategy.RollingUpdate.Partition), false
sts.Status.UpdatedReplicas, *sts.Spec.Replicas-*sts.Spec.UpdateStrategy.RollingUpdate.Partition), "Progressing", false
}
}
return fmt.Sprintf("partitioned roll out complete: %d new pods have been updated...\n",
sts.Status.UpdatedReplicas), true
}
if sts.Status.UpdateRevision != sts.Status.CurrentRevision {
return fmt.Sprintf("waiting for statefulset rolling update to complete %d pods at revision %s...\n",
sts.Status.UpdatedReplicas, sts.Status.UpdateRevision), false
sts.Status.UpdatedReplicas), "Healthy", true
}
return fmt.Sprintf(
"statefulset rolling update complete %d pods at revision %s...\n",
sts.Status.CurrentReplicas, sts.Status.CurrentRevision), true
sts.Status.CurrentReplicas, sts.Status.CurrentRevision), "Healthy", true
}
9 changes: 6 additions & 3 deletions pkg/reconciler/isbsvc/installer/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,26 @@ var (
func TestGetStatefulSetStatus(t *testing.T) {
t.Run("Test statefulset status as true", func(t *testing.T) {
testSts := statefulSet.DeepCopy()
msg, status := getStatefulSetStatus(testSts)
msg, reason, status := getStatefulSetStatus(testSts)
assert.Equal(t, "Healthy", reason)
assert.True(t, status)
assert.Equal(t, "statefulset rolling update complete 3 pods at revision isbsvc-default-js-597b7f74d7...\n", msg)
})

t.Run("Test statefulset status as false", func(t *testing.T) {
testSts := statefulSet.DeepCopy()
testSts.Status.UpdateRevision = "isbsvc-default-js-597b7f73a1"
msg, status := getStatefulSetStatus(testSts)
msg, reason, status := getStatefulSetStatus(testSts)
assert.Equal(t, "Progressing", reason)
assert.False(t, status)
assert.Equal(t, "waiting for statefulset rolling update to complete 3 pods at revision isbsvc-default-js-597b7f73a1...\n", msg)
})

t.Run("Test statefulset with ObservedGeneration as zero", func(t *testing.T) {
testSts := statefulSet.DeepCopy()
testSts.Status.ObservedGeneration = 0
msg, status := getStatefulSetStatus(testSts)
msg, reason, status := getStatefulSetStatus(testSts)
assert.Equal(t, "Progressing", reason)
assert.False(t, status)
assert.Equal(t, "Waiting for statefulset spec update to be observed...\n", msg)
})
Expand Down
17 changes: 10 additions & 7 deletions pkg/reconciler/pipeline/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ import (
// getVertexStatus will calculate the status of the vertices and return the status and reason
func getVertexStatus(vertices *dfv1.VertexList) (bool, string) {
for _, vertex := range vertices.Items {
if !vertex.Status.IsHealthy() {
if vertex.Status.ObservedGeneration == 0 || vertex.Generation > vertex.Status.ObservedGeneration {
return false, "Progressing"
}
if !vertex.Status.IsHealthy() {
return false, "Unavailable"
}
}
return true, "Successful"
return true, "Healthy"
}

// getDeploymentStatus returns a message describing deployment status, and message with reason where bool value
Expand All @@ -30,21 +33,21 @@ func getDeploymentStatus(deployment *appv1.Deployment) (string, string, bool) {
if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas {
return fmt.Sprintf(
"Waiting for deployment %q rollout to finish: %d out of %d new replicas have been updated...\n",
deployment.Name, deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas), "DeploymentNotComplete", false
deployment.Name, deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas), "Progressing", false
}
if deployment.Status.Replicas > deployment.Status.UpdatedReplicas {
return fmt.Sprintf(
"Waiting for deployment %q rollout to finish: %d old replicas are pending termination...\n",
deployment.Name, deployment.Status.Replicas-deployment.Status.UpdatedReplicas), "DeploymentNotComplete", false
deployment.Name, deployment.Status.Replicas-deployment.Status.UpdatedReplicas), "Progressing", false
}
if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas {
return fmt.Sprintf(
"Waiting for deployment %q rollout to finish: %d of %d updated replicas are available...\n",
deployment.Name, deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas), "DeploymentNotComplete", false
deployment.Name, deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas), "Progressing", false
}
return fmt.Sprintf("deployment %q successfully rolled out\n", deployment.Name), "DeploymentComplete", true
return fmt.Sprintf("deployment %q successfully rolled out\n", deployment.Name), "Healthy", true
}
return "Waiting for deployment spec update to be observed...", "DeploymentNotComplete", false
return "Waiting for deployment spec update to be observed...", "Progressing", false
}

// GetDeploymentCondition returns the condition with the provided type.
Expand Down
39 changes: 34 additions & 5 deletions pkg/reconciler/pipeline/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestGetDeploymentStatus(t *testing.T) {
t.Run("Test Deployment status as true", func(t *testing.T) {
testDeployment := deployment.DeepCopy()
message, reason, done := getDeploymentStatus(testDeployment)
assert.Equal(t, "DeploymentComplete", reason)
assert.Equal(t, "Healthy", reason)
assert.True(t, done)
assert.Equal(t, "deployment \"test-deployment\" successfully rolled out\n", message)
})
Expand All @@ -43,7 +43,7 @@ func TestGetDeploymentStatus(t *testing.T) {
testDeployment.Status.ObservedGeneration = 0
testDeployment.Status.UpdatedReplicas = 0
message, reason, done := getDeploymentStatus(testDeployment)
assert.Equal(t, "DeploymentNotComplete", reason)
assert.Equal(t, "Progressing", reason)
assert.False(t, done)
assert.Equal(t, "Waiting for deployment \"test-deployment\" rollout to finish: 0 out of 1 new replicas have been updated...\n", message)
})
Expand All @@ -53,7 +53,7 @@ func TestGetDeploymentStatus(t *testing.T) {
testDeployment.Status.UpdatedReplicas = 1
testDeployment.Status.Replicas = 2
message, reason, done := getDeploymentStatus(testDeployment)
assert.Equal(t, "DeploymentNotComplete", reason)
assert.Equal(t, "Progressing", reason)
assert.False(t, done)
assert.Equal(t, "Waiting for deployment \"test-deployment\" rollout to finish: 1 old replicas are pending termination...\n", message)
})
Expand All @@ -64,8 +64,12 @@ func TestGetVertexStatus(t *testing.T) {
vertices := dfv1.VertexList{
Items: []dfv1.Vertex{
{
ObjectMeta: metav1.ObjectMeta{
Generation: 1,
},
Status: dfv1.VertexStatus{
Phase: "Running",
Phase: "Running",
ObservedGeneration: 1,
},
},
},
Expand All @@ -78,7 +82,32 @@ func TestGetVertexStatus(t *testing.T) {
}
status, reason := getVertexStatus(&vertices)
assert.True(t, status)
assert.Equal(t, "Successful", reason)
assert.Equal(t, "Healthy", reason)
})

t.Run("Test Vertex status as false when ObservedGeneration is not matching", func(t *testing.T) {
vertices := dfv1.VertexList{
Items: []dfv1.Vertex{
{
ObjectMeta: metav1.ObjectMeta{
Generation: 2,
},
Status: dfv1.VertexStatus{
Phase: "Running",
ObservedGeneration: 1,
},
},
},
}
vertices.Items[0].Status.Conditions = []metav1.Condition{
{
Type: string(dfv1.VertexConditionPodsHealthy),
Status: metav1.ConditionTrue,
},
}
status, reason := getVertexStatus(&vertices)
assert.False(t, status)
assert.Equal(t, "Progressing", reason)
})

t.Run("Test Vertex status as false", func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/reconciler/vertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
vertex.Status.Selector = selector.String()

vertex.Status.MarkPhaseRunning()
vertex.Status.SetObservedGeneration(vertex.Generation)
if err = checkChildrenResourceStatus(ctx, r.client, vertex); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to check children resource status: %w", err)
}
Expand Down

0 comments on commit bc1451a

Please sign in to comment.