Skip to content

Commit

Permalink
chore: use readyReplicas to calculate desired replicas
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Sep 10, 2024
1 parent ba40b15 commit 2b4b951
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 18 deletions.
28 changes: 17 additions & 11 deletions pkg/reconciler/monovertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,6 @@ func (s *Scaler) scaleOneMonoVertex(ctx context.Context, key string, worker int)
return nil
}

var err error
daemonClient, _ := s.mvtxDaemonClientsCache.Get(monoVtx.GetDaemonServiceURL())
if daemonClient == nil {
daemonClient, err = mvtxdaemonclient.NewGRPCClient(monoVtx.GetDaemonServiceURL())
if err != nil {
return fmt.Errorf("failed to get daemon service client for MonoVertex %s, %w", monoVtx.Name, err)
}
s.mvtxDaemonClientsCache.Add(monoVtx.GetDaemonServiceURL(), daemonClient)
}

if monoVtx.Status.Replicas == 0 { // Was scaled to 0
// Periodically wake them up from 0 replicas to 1, to peek for the incoming messages
if secondsSinceLastScale >= float64(monoVtx.Spec.Scale.GetZeroReplicaSleepSeconds()) {
Expand All @@ -204,6 +194,22 @@ func (s *Scaler) scaleOneMonoVertex(ctx context.Context, key string, worker int)
}
}

// There's no ready pods, skip scaling
if monoVtx.Status.ReadyReplicas == 0 {
log.Infof("MonoVertex has no ready replicas, skip scaling.")
return nil
}

var err error
daemonClient, _ := s.mvtxDaemonClientsCache.Get(monoVtx.GetDaemonServiceURL())
if daemonClient == nil {
daemonClient, err = mvtxdaemonclient.NewGRPCClient(monoVtx.GetDaemonServiceURL())
if err != nil {
return fmt.Errorf("failed to get daemon service client for MonoVertex %s, %w", monoVtx.Name, err)
}
s.mvtxDaemonClientsCache.Add(monoVtx.GetDaemonServiceURL(), daemonClient)
}

vMetrics, err := daemonClient.GetMonoVertexMetrics(ctx)
if err != nil {
return fmt.Errorf("failed to get metrics of mono vertex key %q, %w", key, err)
Expand Down Expand Up @@ -282,7 +288,7 @@ func (s *Scaler) desiredReplicas(_ context.Context, monoVtx *dfv1.MonoVertex, pr
var desired int32
// We calculate the time of finishing processing the pending messages,
// and then we know how many replicas are needed to get them done in target seconds.
desired = int32(math.Round(((float64(pending) / processingRate) / float64(monoVtx.Spec.Scale.GetTargetProcessingSeconds())) * float64(monoVtx.Status.Replicas)))
desired = int32(math.Round(((float64(pending) / processingRate) / float64(monoVtx.Spec.Scale.GetTargetProcessingSeconds())) * float64(monoVtx.Status.ReadyReplicas)))

// we only scale down to zero when the pending and rate are both zero.
if desired == 0 {
Expand Down
24 changes: 17 additions & 7 deletions pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
s.StopWatching(key) // Remove it in case it's watched.
return nil
}
if vertex.Status.Phase != dfv1.VertexPhaseRunning {
log.Infof("Vertex not in Running phase, skip scaling.")
return nil
}
if vertex.Status.UpdateHash != vertex.Status.CurrentHash && vertex.Status.UpdateHash != "" {
log.Info("Vertex is updating, skip scaling.")
return nil
}
secondsSinceLastScale := time.Since(vertex.Status.LastScaledAt.Time).Seconds()
scaleDownCooldown := float64(vertex.Spec.Scale.GetScaleDownCooldownSeconds())
scaleUpCooldown := float64(vertex.Spec.Scale.GetScaleUpCooldownSeconds())
Expand All @@ -178,10 +186,6 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
log.Infof("Cooldown period, skip scaling.")
return nil
}
if vertex.Status.Phase != dfv1.VertexPhaseRunning {
log.Infof("Vertex not in Running phase, skip scaling.")
return nil
}
pl := &dfv1.Pipeline{}
if err := s.client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: vertex.Spec.PipelineName}, pl); err != nil {
if apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -246,6 +250,12 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
}
}

// Vertex pods are not ready yet.
if vertex.Status.ReadyReplicas == 0 {
log.Infof("Vertex %q has no ready replicas, skip scaling.", vertex.Name)
return nil
}

vMetrics, err := daemonClient.GetVertexMetrics(ctx, pl.Name, vertex.Spec.Name)
if err != nil {
return fmt.Errorf("failed to get metrics of vertex key %q, %w", key, err)
Expand Down Expand Up @@ -289,7 +299,7 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
}

var desired int32
current := int32(vertex.GetReplicas())
current := int32(vertex.Status.Replicas)
// if both totalRate and totalPending are 0, we scale down to 0
// since pending contains the pending acks, we can scale down to 0.
if totalPending == 0 && totalRate == 0 {
Expand Down Expand Up @@ -370,15 +380,15 @@ func (s *Scaler) desiredReplicas(_ context.Context, vertex *dfv1.Vertex, partiti
if vertex.IsASource() {
// For sources, we calculate the time of finishing processing the pending messages,
// and then we know how many replicas are needed to get them done in target seconds.
desired = int32(math.Round(((float64(pending) / rate) / float64(vertex.Spec.Scale.GetTargetProcessingSeconds())) * float64(vertex.Status.Replicas)))
desired = int32(math.Round(((float64(pending) / rate) / float64(vertex.Spec.Scale.GetTargetProcessingSeconds())) * float64(vertex.Status.ReadyReplicas)))
} else {
// For UDF and sinks, we calculate the available buffer length, and consider it is the contribution of current replicas,
// then we figure out how many replicas are needed to keep the available buffer length at target level.
if pending >= partitionBufferLengths[i] {
// Simply return current replica number + max allowed if the pending messages are more than available buffer length
desired = int32(vertex.Status.Replicas) + int32(vertex.Spec.Scale.GetReplicasPerScaleUp())
} else {
singleReplicaContribution := float64(partitionBufferLengths[i]-pending) / float64(vertex.Status.Replicas)
singleReplicaContribution := float64(partitionBufferLengths[i]-pending) / float64(vertex.Status.ReadyReplicas)
desired = int32(math.Round(float64(partitionAvailableBufferLengths[i]) / singleReplicaContribution))
}
}
Expand Down

0 comments on commit 2b4b951

Please sign in to comment.