Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add informer lag histograms #11534

Merged
merged 6 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 73 additions & 3 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1"
"github.com/linkerd/linkerd2/controller/k8s"
Expand Down Expand Up @@ -168,7 +169,7 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log
ew.svcHandle, err = k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ew.addService,
DeleteFunc: ew.deleteService,
UpdateFunc: func(_, obj interface{}) { ew.addService(obj) },
UpdateFunc: ew.updateService,
})
if err != nil {
return nil, err
Expand All @@ -177,7 +178,7 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log
ew.srvHandle, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ew.addServer,
DeleteFunc: ew.deleteServer,
UpdateFunc: func(_, obj interface{}) { ew.addServer(obj) },
UpdateFunc: ew.updateServer,
})
if err != nil {
return nil, err
Expand All @@ -199,7 +200,7 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *log
ew.epHandle, err = k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ew.addEndpoints,
DeleteFunc: ew.deleteEndpoints,
UpdateFunc: func(_, obj interface{}) { ew.addEndpoints(obj) },
UpdateFunc: ew.updateEndpoints,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -292,6 +293,20 @@ func (ew *EndpointsWatcher) addService(obj interface{}) {
sp.updateService(service)
}

func (ew *EndpointsWatcher) updateService(oldObj interface{}, newObj interface{}) {
oldService := oldObj.(*corev1.Service)
newService := newObj.(*corev1.Service)

oldUpdated := latestUpdated(oldService.ManagedFields)
updated := latestUpdated(newService.ManagedFields)
if !updated.IsZero() && updated != oldUpdated {
delta := time.Since(updated)
serviceInformerLag.Observe(delta.Seconds())
}

ew.addService(newObj)
}

func (ew *EndpointsWatcher) deleteService(obj interface{}) {
service, ok := obj.(*corev1.Service)
if !ok {
Expand Down Expand Up @@ -330,6 +345,30 @@ func (ew *EndpointsWatcher) addEndpoints(obj interface{}) {
sp.updateEndpoints(endpoints)
}

func (ew *EndpointsWatcher) updateEndpoints(oldObj interface{}, newObj interface{}) {
oldEndpoints, ok := oldObj.(*corev1.Endpoints)
if !ok {
ew.log.Errorf("error processing endpoints resource, got %#v expected *corev1.Endpoints", oldObj)
return
}
newEndpoints, ok := newObj.(*corev1.Endpoints)
if !ok {
ew.log.Errorf("error processing endpoints resource, got %#v expected *corev1.Endpoints", newObj)
return
}

oldUpdated := latestUpdated(oldEndpoints.ManagedFields)
updated := latestUpdated(newEndpoints.ManagedFields)
if !updated.IsZero() && updated != oldUpdated {
delta := time.Since(updated)
endpointsInformerLag.Observe(delta.Seconds())
}

id := ServiceID{newEndpoints.Namespace, newEndpoints.Name}
sp := ew.getOrNewServicePublisher(id)
sp.updateEndpoints(newEndpoints)
}

func (ew *EndpointsWatcher) deleteEndpoints(obj interface{}) {
endpoints, ok := obj.(*corev1.Endpoints)
if !ok {
Expand Down Expand Up @@ -384,6 +423,12 @@ func (ew *EndpointsWatcher) updateEndpointSlice(oldObj interface{}, newObj inter
ew.log.Errorf("error processing EndpointSlice resource, got %#v expected *discovery.EndpointSlice", newObj)
return
}
oldUpdated := latestUpdated(oldSlice.ManagedFields)
updated := latestUpdated(newSlice.ManagedFields)
if !updated.IsZero() && updated != oldUpdated {
delta := time.Since(updated)
endpointsliceInformerLag.Observe(delta.Seconds())
}

id, err := getEndpointSliceServiceID(newSlice)
if err != nil {
Expand Down Expand Up @@ -466,6 +511,19 @@ func (ew *EndpointsWatcher) addServer(obj interface{}) {
}
}

func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{}) {
oldServer := oldObj.(*v1beta1.Server)
newServer := newObj.(*v1beta1.Server)
oldUpdated := latestUpdated(oldServer.ManagedFields)
updated := latestUpdated(newServer.ManagedFields)
if !updated.IsZero() && updated != oldUpdated {
delta := time.Since(updated)
serverInformerLag.Observe(delta.Seconds())
}

ew.addServer(newObj)
}

func (ew *EndpointsWatcher) deleteServer(obj interface{}) {
ew.Lock()
defer ew.Unlock()
Expand Down Expand Up @@ -1338,3 +1396,15 @@ func SetToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error {
}
return nil
}

func latestUpdated(managedFields []metav1.ManagedFieldsEntry) time.Time {
var latest time.Time
for _, field := range managedFields {
if field.Operation == metav1.ManagedFieldsOperationUpdate {
if latest.IsZero() || field.Time.After(latest) {
latest = field.Time.Time
}
}
}
return latest
}
16 changes: 15 additions & 1 deletion controller/api/destination/watcher/opaque_ports_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package watcher
import (
"strconv"
"sync"
"time"

"github.com/linkerd/linkerd2/controller/k8s"
labels "github.com/linkerd/linkerd2/pkg/k8s"
Expand Down Expand Up @@ -59,7 +60,7 @@ func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[
_, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: opw.addService,
DeleteFunc: opw.deleteService,
UpdateFunc: func(_, obj interface{}) { opw.addService(obj) },
UpdateFunc: opw.updateService,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -133,6 +134,19 @@ func (opw *OpaquePortsWatcher) Unsubscribe(id ServiceID, listener OpaquePortsUpd
}
}

func (opw *OpaquePortsWatcher) updateService(oldObj interface{}, newObj interface{}) {
newSvc := newObj.(*corev1.Service)
oldSvc := oldObj.(*corev1.Service)

oldUpdated := latestUpdated(oldSvc.ManagedFields)
updated := latestUpdated(newSvc.ManagedFields)
if !updated.IsZero() && updated != oldUpdated {
delta := time.Since(updated)
serviceInformerLag.Observe(delta.Seconds())
}
opw.addService(newObj)
}

func (opw *OpaquePortsWatcher) addService(obj interface{}) {
opw.Lock()
defer opw.Unlock()
Expand Down
32 changes: 28 additions & 4 deletions controller/api/destination/watcher/pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1"
"github.com/linkerd/linkerd2/controller/k8s"
consts "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/util"
Expand Down Expand Up @@ -78,9 +80,9 @@ func NewPodWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.E
}

_, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pw.updateServer,
DeleteFunc: pw.updateServer,
UpdateFunc: func(_, obj interface{}) { pw.updateServer(obj) },
AddFunc: pw.updateServers,
DeleteFunc: pw.updateServers,
UpdateFunc: pw.updateServer,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -177,6 +179,14 @@ func (pw *PodWatcher) updatePod(oldObj any, newObj any) {
// this is just a mark, wait for actual deletion event
return
}

oldUpdated := latestUpdated(oldPod.ManagedFields)
updated := latestUpdated(newPod.ManagedFields)
if !updated.IsZero() && updated != oldUpdated {
delta := time.Since(updated)
podInformerLag.Observe(delta.Seconds())
}

pw.log.Tracef("Updated pod %s.%s", newPod.Name, newPod.Namespace)
go pw.submitPodUpdate(newPod, false)
}
Expand Down Expand Up @@ -221,10 +231,24 @@ func (pw *PodWatcher) submitPodUpdate(pod *corev1.Pod, remove bool) {
}
}

func (pw *PodWatcher) updateServer(oldObj interface{}, newObj interface{}) {
oldServer := oldObj.(*v1beta1.Server)
newServer := newObj.(*v1beta1.Server)

oldUpdated := latestUpdated(oldServer.ManagedFields)
updated := latestUpdated(newServer.ManagedFields)
if !updated.IsZero() && updated != oldUpdated {
delta := time.Since(updated)
serverInformerLag.Observe(delta.Seconds())
}

pw.updateServers(newObj)
}

// updateServer triggers an Update() call to the listeners of the podPublishers
// whose pod matches the Server's selector. This function is an event handler
// so it cannot block.
func (pw *PodWatcher) updateServer(_ any) {
func (pw *PodWatcher) updateServers(_ any) {
pw.mu.RLock()
defer pw.mu.RUnlock()

Expand Down
11 changes: 11 additions & 0 deletions controller/api/destination/watcher/profile_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package watcher

import (
"sync"
"time"

sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
splisters "github.com/linkerd/linkerd2/controller/gen/client/listers/serviceprofile/v1alpha2"
Expand Down Expand Up @@ -105,6 +106,16 @@ func (pw *ProfileWatcher) addProfile(obj interface{}) {
}

func (pw *ProfileWatcher) updateProfile(old interface{}, new interface{}) {
oldProfile := old.(*sp.ServiceProfile)
newProfile := new.(*sp.ServiceProfile)

oldUpdated := latestUpdated(oldProfile.ManagedFields)
updated := latestUpdated(newProfile.ManagedFields)
if !updated.IsZero() && updated != oldUpdated {
delta := time.Since(updated)
serviceProfileInformerLag.Observe(delta.Seconds())
}

pw.addProfile(new)
}

Expand Down
62 changes: 62 additions & 0 deletions controller/api/destination/watcher/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,68 @@ type (
}
)

var (
informer_lag_seconds_buckets = []float64{
0.5, // 500ms
1, // 1s
2.5, // 2.5s
5, // 5s
10, // 10s
25, // 25s
50, // 50s
100, // 1m 40s
250, // 4m 10s
1000, // 16m 40s
}
endpointsInformerLag = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "endpoints_informer_lag_seconds",
Help: "The amount of time between when an Endpoints resource is updated and when an informer observes it",
Buckets: informer_lag_seconds_buckets,
},
)

endpointsliceInformerLag = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "endpointslices_informer_lag_seconds",
Help: "The amount of time between when an EndpointSlice resource is updated and when an informer observes it",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Help: "The amount of time between when an EndpointSlice resource is updated and when an informer observes it",
Help: "The amount of time between when an EndpointSlices resource is updated and when an informer observes it",

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is correct as it is:

:; k get endpointslices.discovery.k8s.io -o yaml |grep kind
  kind: EndpointSlice
kind: List

Buckets: informer_lag_seconds_buckets,
},
)

serviceInformerLag = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "services_informer_lag_seconds",
Help: "The amount of time between when a Service resource is updated and when an informer observes it",
Buckets: informer_lag_seconds_buckets,
},
)

serverInformerLag = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "servers_informer_lag_seconds",
Help: "The amount of time between when a Server resource is updated and when an informer observes it",
Buckets: informer_lag_seconds_buckets,
},
)

podInformerLag = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "pods_informer_lag_seconds",
Help: "The amount of time between when a Pod resource is updated and when an informer observes it",
Buckets: informer_lag_seconds_buckets,
},
)

serviceProfileInformerLag = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "serviceprofiles_informer_lag_seconds",
Help: "The amount of time between when a ServiceProfile resource is updated and when an informer observes it",
Buckets: informer_lag_seconds_buckets,
},
)
)

func newMetricsVecs(name string, labels []string) metricsVecs {
subscribers := promauto.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down
Loading