Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Jan 1, 2025
1 parent d3f7536 commit 8d0c808
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 16 deletions.
1 change: 1 addition & 0 deletions pkg/mvtxdaemon/server/service/mvtx_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (mvs *MonoVertexService) startHealthCheck(ctx context.Context) {
}
}

// GetMonoVertexLookback returns the current lookback of the MonoVertex from the rater
func (mvs *MonoVertexService) GetMonoVertexLookback(ctx context.Context, empty *emptypb.Empty) (*mvtxdaemon.GetMonoVertexLookbackResponse, error) {
return &mvtxdaemon.GetMonoVertexLookbackResponse{
Lookback: mvs.rater.GetLookBack(),
Expand Down
9 changes: 3 additions & 6 deletions pkg/mvtxdaemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ type Rater struct {
timestampedPodCounts *sharedqueue.OverflowQueue[*TimestampedCounts]
// timestampedPodProcessingTime is a map between vertex name and a queue of timestamped processing times for that vertex
timestampedPodProcessingTime *sharedqueue.OverflowQueue[*TimestampedProcessingTime]
// userSpecifiedLookBackSeconds is a map between vertex name and the user-specified lookback seconds for that vertex
// userSpecifiedLookBackSeconds the current lookback seconds for the monovertex
// this can be updated dynamically, defaults to user-specified value in the spec
userSpecifiedLookBackSeconds *atomic.Float64
options *options
}

// GetLookBack is used
// GetLookBack returns the current lookback seconds for the MonoVertex.
func (r *Rater) GetLookBack() *wrapperspb.DoubleValue {
return wrapperspb.Double(r.userSpecifiedLookBackSeconds.Load())
}
Expand Down Expand Up @@ -167,7 +168,6 @@ func (r *Rater) monitorOnePod(ctx context.Context, key string, worker int) error
now := time.Now().Add(CountWindow).Truncate(CountWindow).Unix()
UpdateCount(r.timestampedPodCounts, now, podReadCount)
UpdateProcessingTime(r.timestampedPodProcessingTime, now, processingTime)
r.log.Infof("MYDEBUG: processing rate vertex %s is: %v", pInfo.monoVertexName, r.timestampedPodProcessingTime)
return nil
}

Expand Down Expand Up @@ -214,9 +214,6 @@ func (r *Rater) GetRates() map[string]*wrapperspb.DoubleValue {
var result = make(map[string]*wrapperspb.DoubleValue)
// calculate rates for each lookback seconds
for n, i := range r.buildLookbackSecondsMap() {
if n == "default" {
r.log.Infof("MYDEBUG: lookback %d", i)
}
rate := CalculateRate(r.timestampedPodCounts, i)
result[n] = wrapperspb.Double(rate)
}
Expand Down
34 changes: 25 additions & 9 deletions pkg/mvtxdaemon/server/service/rater/rater_lookback_utils.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rater

import (
Expand All @@ -10,9 +26,9 @@ import (
sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue"
)

// PodProcessingTime is a struct to maintain processing time for a batch by a pod
// PodProcessingTime is a struct to maintain the required data
// to calculate the processing time for a batch by a pod
type PodProcessingTime struct {
// pod name
name string
processingTimeSum float64
processingTimeCount float64
Expand All @@ -22,7 +38,7 @@ func (p *PodProcessingTime) Name() string {
return p.name
}

func (p *PodProcessingTime) processingTime() (float64, float64) {
func (p *PodProcessingTime) processingTimeValues() (float64, float64) {
return p.processingTimeSum, p.processingTimeCount
}

Expand All @@ -35,7 +51,6 @@ func (r *Rater) updateDynamicLookbackSecs() {
processingTimeSeconds, update := r.CalculateVertexProcessingTime(r.timestampedPodProcessingTime)
r.log.Infof("MYDEBUG: processingTimeSeconds %f ", processingTimeSeconds)
if !update {
r.log.Infof("MYDEBUG: NO UPDATE processingTimeSeconds %f ", processingTimeSeconds)
return
}
// if the current calculated processing time is greater than the lookback Seconds, update it
Expand All @@ -44,15 +59,17 @@ func (r *Rater) updateDynamicLookbackSecs() {
// round up to the nearest minute, also ensure that while going up and down we have the consistent value for
// a given processingTimeSeconds, then convert back to seconds
roundedProcessingTime := 60 * int(math.Ceil(processingTimeSeconds/60))
// step up case
if roundedProcessingTime > int(currentVal) {
r.userSpecifiedLookBackSeconds.Store(math.Min(MaxLookback.Seconds(), float64(roundedProcessingTime)))
r.log.Infof("MYDEBUG: Updated for vertex %s, old %f new %d", vertexName, currentVal, roundedProcessingTime)
r.log.Infof("Lookback updated for mvtx %s, old %f new %d", vertexName, currentVal, roundedProcessingTime)
} else {
// step down case
// We should not be setting values lower than the lookBackSeconds defined in the spec
roundedProcessingTime = int(math.Max(float64(roundedProcessingTime), float64(r.monoVertex.Spec.Scale.GetLookbackSeconds())))
if roundedProcessingTime != int(currentVal) {
r.userSpecifiedLookBackSeconds.Store(float64(roundedProcessingTime))
r.log.Infof("MYDEBUG: Updated for vertex %s, old %f new %d", vertexName, currentVal, roundedProcessingTime)
r.log.Infof("Lookback updated for mvtx %s, old %f new %d", vertexName, currentVal, roundedProcessingTime)
}
}
}
Expand Down Expand Up @@ -109,7 +126,6 @@ func (r *Rater) CalculateVertexProcessingTime(q *sharedqueue.OverflowQueue[*Time
average := totalTime / float64(totalCnt)
if average > maxAverage {
maxAverage = average
r.log.Infof("MYDEBUG: Using %d vals %f ", totalCnt, average)
}
}

Expand All @@ -118,7 +134,7 @@ func (r *Rater) CalculateVertexProcessingTime(q *sharedqueue.OverflowQueue[*Time
return maxAverage, true
}

// getPodProcessingTime
// getPodProcessingTime is a utility function to get the metrics from the pod
func (r *Rater) getPodProcessingTime(podName string) *PodProcessingTime {
processingTimeCountMetric := "monovtx_processing_time"
headlessServiceName := r.monoVertex.GetHeadlessServiceName()
Expand Down Expand Up @@ -149,7 +165,7 @@ func (r *Rater) getPodProcessingTime(podName string) *PodProcessingTime {
podCount = float64(ele.Histogram.GetSampleCount())
podSum = ele.Histogram.GetSampleSum()
} else {
r.log.Infof("[Pod name %s]: Metric %q is unavailable, the pod might haven't started processing data", podName, processingTimeCountMetric)
r.log.Debugf("[Pod name %s]: Metric %q is unavailable, the pod might haven't started processing data", podName, processingTimeCountMetric)
return nil
}
return &PodProcessingTime{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (tc *TimestampedProcessingTime) Update(podReadCount *PodProcessingTime) {
if podReadCount == nil {
return
}
sum, count := podReadCount.processingTime()
sum, count := podReadCount.processingTimeValues()

// Convert microseconds to seconds
microseconds := sum / count
Expand Down

0 comments on commit 8d0c808

Please sign in to comment.