Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Aug 17, 2024
1 parent 9876d28 commit 87aec3b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
35 changes: 27 additions & 8 deletions pkg/mvtxdaemon/server/service/health_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

const (
// healthTimeStep is the frequency at which the health of a vertex is computed
// healthTimeStep is the frequency at which the health of a MonoVertex is computed
healthTimeStep = 30 * time.Second
)

Expand Down Expand Up @@ -78,7 +78,8 @@ func NewHealthChecker(monoVertex *v1alpha1.MonoVertex) *HealthChecker {
}
}

// monoVtxState is a struct which contains the name and state of a vertex
// monoVtxState is a struct which contains the name and data health state
// of a MonoVertex
type monoVtxState struct {
// Name is the name of the vertex
Name string `json:"name"`
Expand All @@ -94,7 +95,7 @@ func newMonoVtxState(name string, state string) *monoVtxState {
}
}

// getCurrentHealth returns the current health status of the pipeline.
// getCurrentHealth returns the current health status of the MonoVertex.
// It is thread safe to ensure concurrent access.
func (hc *HealthChecker) getCurrentHealth() *dataHealthResponse {
// Lock the statusLock to ensure thread safety.
Expand All @@ -104,7 +105,7 @@ func (hc *HealthChecker) getCurrentHealth() *dataHealthResponse {
return hc.currentDataStatus
}

// setCurrentHealth sets the current health status of the pipeline.
// setCurrentHealth sets the current health status of the MonoVertex.
// It is thread safe to ensure concurrent access.
func (hc *HealthChecker) setCurrentHealth(status *dataHealthResponse) {
// Lock the statusLock to ensure thread safety.
Expand All @@ -119,22 +120,41 @@ func (hc *HealthChecker) setCurrentHealth(status *dataHealthResponse) {
// 1. Healthy: The MonoVertex is working as expected
// 2. Warning: The MonoVertex is working but there could be a lag in the data movement
// 3. Critical: The MonoVertex is not working as expected
// We need to check the following things to determine the data criticality of the pipeline:
// 1. The buffer usage of each buffer in the pipeline
// We need to check the following things to determine the data criticality of the MonoVertex:
// At any given instant of time what is the desired number of replicas required by the MonoVertex
// to clear out the backlog in the target state time.
// This logic is similar to our scaling logic.
// Based on the desired replicas, we decide the data criticality.
//
// - If the current replicas are equal to the max replicas, and the desired replicas are more than the max replicas,
// the data criticality is Critical. This means that the MonoVertex is not able to process the data at the rate
// it is coming in, and the due to the provided specified scale we cannot add more replicas as well.
// Else we consider the data criticality as healthy.
//
// TODO(MonoVertex): Add the logic to determine the warning state based on more conditions.
func (hc *HealthChecker) getMonoVertexDataCriticality(ctx context.Context, mvtxMetrics *mvtxdaemon.MonoVertexMetrics) (*monoVtxState, error) {
// Get the desired replicas for the MonoVertex based on the metrics
desiredReplicas, err := hc.getDesiredReplica(mvtxMetrics)
if err != nil {
return nil, err
}
// Get the current state of the MonoVertex replicas
currentReplicas := hc.monoVertex.GetReplicas()
maxReplicas := int(hc.monoVertex.Spec.Scale.GetMaxReplicas())
// default status is healthy
status := v1alpha1.MonoVertexStatusHealthy
// If the current replicas are equal to the max replicas, and the desired replicas are more than the max replicas,
// the data criticality is Critical.
if currentReplicas == maxReplicas && desiredReplicas > maxReplicas {
status = v1alpha1.MonoVertexStatusCritical
}
return newMonoVtxState(mvtxMetrics.MonoVertex, status), nil
}

// getDesiredReplica calculates the desired replicas based on the processing rate and pending information
// of the MonoVertex. This logic is similar to our scaling logic.
// But unlike the scaling where we change the desired replicas based on the provided scale,
// here we just calculate the desired replicas and return it.
func (hc *HealthChecker) getDesiredReplica(mvtxMetrics *mvtxdaemon.MonoVertexMetrics) (int, error) {
totalRate := float64(0)
totalPending := int64(0)
Expand Down Expand Up @@ -171,10 +191,9 @@ func (hc *HealthChecker) getDesiredReplica(mvtxMetrics *mvtxdaemon.MonoVertexMet
return int(hc.monoVertex.Status.Replicas), nil
}

var desired int32
// We calculate the time of finishing processing the pending messages,
// and then we know how many replicas are needed to get them done in target seconds.
desired = int32(math.Round(((float64(totalPending) / totalRate) / float64(hc.monoVertex.Spec.Scale.GetTargetProcessingSeconds())) * float64(hc.monoVertex.Status.Replicas)))
desired := int32(math.Round(((float64(totalPending) / totalRate) / float64(hc.monoVertex.Spec.Scale.GetTargetProcessingSeconds())) * float64(hc.monoVertex.Status.Replicas)))
return int(desired), nil
}

Expand Down
2 changes: 2 additions & 0 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,11 +1159,13 @@ func (h *handler) GetMonoVertexHealth(c *gin.Context) {
return
}

// Create a new daemon client to get the data status
client, err := h.getMonoVertexDaemonClient(ns, monoVertex)
if err != nil || client == nil {
h.respondWithError(c, fmt.Sprintf("failed to get daemon service client for mono vertex %q, %s", monoVertex, err.Error()))
return
}
// Data level health status
dataHealth, err := client.GetMonoVertexStatus(c)
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to get the mono vertex dataStatus: namespace %q mono vertex %q: %s", ns, monoVertex, err.Error()))
Expand Down

0 comments on commit 87aec3b

Please sign in to comment.