From c92f8f26cd67058bea39453ccff2ad333f264082 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 22 Dec 2023 09:25:12 -0800 Subject: [PATCH 1/2] destination: Fix GetProfile endpoint buffering (#11815) In 71635cb and 357a1d3 we updated the endpoint and profile translators to prevent backpressure from stalling the informer tasks. This change updates the endpoint profile translator with the same fix, so that updates are buffered and can detect when when a gRPC stream is stalled. Furthermore, the update method is updated to use a protobuf-aware comparison method instead of using serialization and string comparison. A test is added for the endpoint profile translator, since none existed previously. --- .../endpoint_profile_translator.go | 102 +++++++++++--- .../endpoint_profile_translator_test.go | 125 ++++++++++++++++++ controller/api/destination/server.go | 13 +- controller/api/destination/test_util.go | 2 +- .../api/destination/watcher/pod_watcher.go | 35 +++-- 5 files changed, 238 insertions(+), 39 deletions(-) create mode 100644 controller/api/destination/endpoint_profile_translator_test.go diff --git a/controller/api/destination/endpoint_profile_translator.go b/controller/api/destination/endpoint_profile_translator.go index 8ca312b1563d8..bef7f47050cf2 100644 --- a/controller/api/destination/endpoint_profile_translator.go +++ b/controller/api/destination/endpoint_profile_translator.go @@ -6,7 +6,10 @@ import ( pb "github.com/linkerd/linkerd2-proxy-api/go/destination" "github.com/linkerd/linkerd2/controller/api/destination/watcher" "github.com/linkerd/linkerd2/controller/k8s" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" logging "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" ) type endpointProfileTranslator struct { @@ -14,25 +17,43 @@ type endpointProfileTranslator struct { controllerNS string identityTrustDomain string defaultOpaquePorts map[uint32]struct{} - stream pb.Destination_GetProfileServer - lastMessage string + + stream pb.Destination_GetProfileServer + endStream chan struct{} + + updates chan *watcher.Address + stop chan struct{} + + current *pb.DestinationProfile k8sAPI *k8s.API metadataAPI *k8s.MetadataAPI log *logging.Entry } -// newEndpointProfileTranslator translates pod updates and protocol updates to +// endpointProfileUpdatesQueueOverflowCounter is a prometheus counter that is incremented +// whenever the profile updates queue overflows. +// +// We omit ip and port labels because they are high cardinality. +var endpointProfileUpdatesQueueOverflowCounter = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "endpoint_profile_updates_queue_overflow", + Help: "A counter incremented whenever the endpoint profile updates queue overflows", + }, +) + +// newEndpointProfileTranslator translates pod updates and profile updates to // DestinationProfiles for endpoints func newEndpointProfileTranslator( enableH2Upgrade bool, controllerNS, identityTrustDomain string, defaultOpaquePorts map[uint32]struct{}, - log *logging.Entry, - stream pb.Destination_GetProfileServer, k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, + stream pb.Destination_GetProfileServer, + endStream chan struct{}, + log *logging.Entry, ) *endpointProfileTranslator { return &endpointProfileTranslator{ enableH2Upgrade: enableH2Upgrade, @@ -40,19 +61,67 @@ func newEndpointProfileTranslator( identityTrustDomain: identityTrustDomain, defaultOpaquePorts: defaultOpaquePorts, stream: stream, + endStream: endStream, + updates: make(chan *watcher.Address, updateQueueCapacity), + stop: make(chan struct{}), k8sAPI: k8sAPI, metadataAPI: metadataAPI, log: log.WithField("component", "endpoint-profile-translator"), } } -// Update sends a DestinationProfile message into the stream, if the same -// message hasn't been sent already. If it has, false is returned. -func (ept *endpointProfileTranslator) Update(address *watcher.Address) (bool, error) { +// Start initiates a goroutine which processes update events off of the +// endpointProfileTranslator's internal queue and sends to the grpc stream as +// appropriate. The goroutine calls non-thread-safe Send, therefore Start must +// not be called more than once. +func (ept *endpointProfileTranslator) Start() { + go func() { + for { + select { + case update := <-ept.updates: + ept.update(update) + case <-ept.stop: + return + } + } + }() +} + +// Stop terminates the goroutine started by Start. +func (ept *endpointProfileTranslator) Stop() { + close(ept.stop) +} + +// Update enqueues an address update to be translated into a DestinationProfile. +// An error is returned if the update cannot be enqueued. +func (ept *endpointProfileTranslator) Update(address *watcher.Address) error { + select { + case ept.updates <- address: + // Update has been successfully enqueued. + return nil + default: + select { + case <-ept.endStream: + // The endStream channel has already been closed so no action is + // necessary. + return fmt.Errorf("profile update stream closed") + default: + // We are unable to enqueue because the channel does not have capacity. + // The stream has fallen too far behind and should be closed. + endpointProfileUpdatesQueueOverflowCounter.Inc() + close(ept.endStream) + return fmt.Errorf("profile update queue full; aborting stream") + } + } +} + +func (ept *endpointProfileTranslator) update(address *watcher.Address) { opaquePorts := watcher.GetAnnotatedOpaquePorts(address.Pod, ept.defaultOpaquePorts) endpoint, err := ept.createEndpoint(*address, opaquePorts) if err != nil { - return false, fmt.Errorf("failed to create endpoint: %w", err) + ept.log.Errorf("Failed to create endpoint for %s:%d: %s", + address.IP, address.Port, err) + return } // The protocol for an endpoint should only be updated if there is a pod, @@ -79,17 +148,18 @@ func (ept *endpointProfileTranslator) Update(address *watcher.Address) (bool, er Endpoint: endpoint, OpaqueProtocol: address.OpaqueProtocol, } - msg := profile.String() - if msg == ept.lastMessage { - return false, nil + if proto.Equal(profile, ept.current) { + ept.log.Debugf("Ignoring redundant profile update: %+v", profile) + return } - ept.lastMessage = msg - ept.log.Debugf("sending protocol update: %+v", profile) + + ept.log.Debugf("Sending profile update: %+v", profile) if err := ept.stream.Send(profile); err != nil { - return false, fmt.Errorf("failed to send protocol update: %w", err) + ept.log.Errorf("failed to send profile update: %s", err) + return } - return true, nil + ept.current = profile } func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, opaquePorts map[uint32]struct{}) (*pb.WeightedAddr, error) { diff --git a/controller/api/destination/endpoint_profile_translator_test.go b/controller/api/destination/endpoint_profile_translator_test.go new file mode 100644 index 0000000000000..f15fd8baa9028 --- /dev/null +++ b/controller/api/destination/endpoint_profile_translator_test.go @@ -0,0 +1,125 @@ +package destination + +import ( + "testing" + "time" + + pb "github.com/linkerd/linkerd2-proxy-api/go/destination" + "github.com/linkerd/linkerd2/controller/api/destination/watcher" + consts "github.com/linkerd/linkerd2/pkg/k8s" + logging "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestEndpointProfileTranslator(t *testing.T) { + // logging.SetLevel(logging.TraceLevel) + // defer logging.SetLevel(logging.PanicLevel) + + addr := &watcher.Address{ + IP: "10.10.11.11", + Port: 8080, + } + podAddr := &watcher.Address{ + IP: "10.10.11.11", + Port: 8080, + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + consts.ProxyOpaquePortsAnnotation: "8080", + }, + }, + }, + } + + t.Run("Sends update", func(t *testing.T) { + mockGetProfileServer := &mockDestinationGetProfileServer{ + profilesReceived: make(chan *pb.DestinationProfile, 1), + } + log := logging.WithField("test", t.Name()) + translator := newEndpointProfileTranslator( + true, "cluster", "identity", make(map[uint32]struct{}), + mockGetProfileServer, + nil, + log, + ) + translator.Start() + defer translator.Stop() + + if err := translator.Update(addr); err != nil { + t.Fatal("Expected update") + } + select { + case p := <-mockGetProfileServer.profilesReceived: + log.Debugf("Received update: %v", p) + case <-time.After(1 * time.Second): + t.Fatal("No update received") + } + + if err := translator.Update(addr); err != nil { + t.Fatal("Unexpected update") + } + select { + case p := <-mockGetProfileServer.profilesReceived: + t.Fatalf("Duplicate update sent: %v", p) + case <-time.After(1 * time.Second): + } + + if err := translator.Update(podAddr); err != nil { + t.Fatal("Expected update") + } + select { + case p := <-mockGetProfileServer.profilesReceived: + log.Debugf("Received update: %v", p) + case <-time.After(1 * time.Second): + } + }) + + t.Run("Handles overflow", func(t *testing.T) { + mockGetProfileServer := &mockDestinationGetProfileServer{ + profilesReceived: make(chan *pb.DestinationProfile, 1), + } + log := logging.WithField("test", t.Name()) + endStream := make(chan struct{}) + translator := newEndpointProfileTranslator( + true, "cluster", "identity", make(map[uint32]struct{}), + mockGetProfileServer, + endStream, + log, + ) + translator.Start() + defer translator.Stop() + + for i := 0; i < updateQueueCapacity/2; i++ { + if err := translator.Update(podAddr); err != nil { + t.Fatal("Expected update") + } + select { + case <-endStream: + t.Fatal("Stream ended prematurely") + default: + } + + if err := translator.Update(addr); err != nil { + t.Fatal("Expected update") + } + select { + case <-endStream: + t.Fatal("Stream ended prematurely") + default: + } + } + + if err := translator.Update(podAddr); err == nil { + t.Fatal("Expected update to fail") + } + select { + case <-endStream: + default: + t.Fatal("Stream should have ended") + } + + // XXX We should assert that endpointProfileUpdatesQueueOverflowCounter + // == 1 but we can't read counter values. + }) +} diff --git a/controller/api/destination/server.go b/controller/api/destination/server.go index 4fadda88a6bce..ab60eef1d3323 100644 --- a/controller/api/destination/server.go +++ b/controller/api/destination/server.go @@ -484,16 +484,21 @@ func (s *server) subscribeToEndpointProfile( log *logging.Entry, stream pb.Destination_GetProfileServer, ) error { + canceled := stream.Context().Done() + streamEnd := make(chan struct{}) translator := newEndpointProfileTranslator( s.enableH2Upgrade, s.controllerNS, s.identityTrustDomain, s.defaultOpaquePorts, - log, - stream, s.k8sAPI, s.metadataAPI, + stream, + streamEnd, + log, ) + translator.Start() + defer translator.Stop() var err error ip, err = s.pods.Subscribe(service, hostname, ip, port, translator) @@ -504,8 +509,10 @@ func (s *server) subscribeToEndpointProfile( select { case <-s.shutdown: - case <-stream.Context().Done(): + case <-canceled: s.log.Debugf("Cancelled") + case <-streamEnd: + log.Errorf("GetProfile %s:%d stream aborted", ip, port) } return nil } diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index df79bab9a8249..73961b001fdc0 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -460,7 +460,7 @@ spec: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } log := logging.WithField("test", t.Name()) - logging.SetLevel(logging.TraceLevel) + // logging.SetLevel(logging.TraceLevel) defaultOpaquePorts := map[uint32]struct{}{ 25: {}, 443: {}, diff --git a/controller/api/destination/watcher/pod_watcher.go b/controller/api/destination/watcher/pod_watcher.go index ad19b9f2da4d0..ae9090e578a19 100644 --- a/controller/api/destination/watcher/pod_watcher.go +++ b/controller/api/destination/watcher/pod_watcher.go @@ -53,7 +53,7 @@ type ( // PodUpdateListener is the interface subscribers must implement. PodUpdateListener interface { - Update(*Address) (bool, error) + Update(*Address) error } ) @@ -115,13 +115,10 @@ func (pw *PodWatcher) Subscribe(service *ServiceID, hostname, ip string, port Po return "", err } - sent, err := listener.Update(&address) - if err != nil { - return "", err - } - if sent { - pp.metrics.incUpdates() + if err = listener.Update(&address); err != nil { + return "", fmt.Errorf("failed to send initial update: %w", err) } + pp.metrics.incUpdates() return pp.ip, nil } @@ -261,11 +258,11 @@ func (pw *PodWatcher) updateServers(_ any) { pw.log.Errorf("Error creating address for pod: %s", err) continue } - sent, err := listener.Update(&addr) - if err != nil { - pw.log.Errorf("Error calling pod watcher listener for server update: %s", err) + if err = listener.Update(&addr); err != nil { + pw.log.Warnf("Error sending update to listener: %s", err) + continue } - updated = updated || sent + updated = true } if updated { pp.metrics.incUpdates() @@ -464,11 +461,11 @@ func (pp *podPublisher) updatePod(pod *corev1.Pod) { pp.log.Errorf("Error creating address for pod: %s", err) continue } - sent, err := l.Update(&addr) - if err != nil { - pp.log.Errorf("Error calling pod watcher listener for pod update: %s", err) + if err = l.Update(&addr); err != nil { + pp.log.Warnf("Error sending update to listener: %s", err) + continue } - updated = updated || sent + updated = true } if updated { pp.metrics.incUpdates() @@ -487,11 +484,11 @@ func (pp *podPublisher) updatePod(pod *corev1.Pod) { pp.log.Errorf("Error creating address for pod: %s", err) continue } - sent, err := l.Update(&addr) - if err != nil { - pp.log.Errorf("Error calling pod watcher listener for pod deletion: %s", err) + if err = l.Update(&addr); err != nil { + pp.log.Warnf("Error sending update to listener: %s", err) + continue } - updated = updated || sent + updated = true } if updated { pp.metrics.incUpdates() From c907c64ade949e4fb62325368a01db112a1416ec Mon Sep 17 00:00:00 2001 From: Alejandro Pedraza Date: Wed, 3 Jan 2024 09:47:06 -0500 Subject: [PATCH 2/2] stable-2.14.8 This stable release fixes an issue in the control plane where discovery for pod IP addresses could hang indefinitely ([#11815]). --- CHANGES.md | 7 +++++++ charts/linkerd-control-plane/Chart.yaml | 2 +- charts/linkerd-control-plane/README.md | 2 +- .../api/destination/endpoint_profile_translator_test.go | 2 ++ jaeger/charts/linkerd-jaeger/Chart.yaml | 2 +- jaeger/charts/linkerd-jaeger/README.md | 2 +- multicluster/charts/linkerd-multicluster/Chart.yaml | 2 +- multicluster/charts/linkerd-multicluster/README.md | 2 +- viz/charts/linkerd-viz/Chart.yaml | 2 +- viz/charts/linkerd-viz/README.md | 2 +- 10 files changed, 17 insertions(+), 8 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 83937f15a0d54..17ba7b17e4327 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## stable-2.14.8 + +This stable release fixes an issue in the control plane where discovery for pod +IP addresses could hang indefinitely ([#11815]). + +[#11815]: https://github.com/linkerd/linkerd2/pull/11815 + ## stable-2.14.7 This stable release fixes two bugs in the Linkerd control plane. diff --git a/charts/linkerd-control-plane/Chart.yaml b/charts/linkerd-control-plane/Chart.yaml index 47d4485253143..f7fd02fe30091 100644 --- a/charts/linkerd-control-plane/Chart.yaml +++ b/charts/linkerd-control-plane/Chart.yaml @@ -16,7 +16,7 @@ dependencies: - name: partials version: 0.1.0 repository: file://../partials -version: 1.16.8 +version: 1.16.9 icon: https://linkerd.io/images/logo-only-200h.png maintainers: - name: Linkerd authors diff --git a/charts/linkerd-control-plane/README.md b/charts/linkerd-control-plane/README.md index 04e347d3b7b59..8658a541756ad 100644 --- a/charts/linkerd-control-plane/README.md +++ b/charts/linkerd-control-plane/README.md @@ -3,7 +3,7 @@ Linkerd gives you observability, reliability, and security for your microservices — with no code change required. -![Version: 1.16.8](https://img.shields.io/badge/Version-1.16.8-informational?style=flat-square) +![Version: 1.16.9](https://img.shields.io/badge/Version-1.16.9-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: edge-XX.X.X](https://img.shields.io/badge/AppVersion-edge--XX.X.X-informational?style=flat-square) diff --git a/controller/api/destination/endpoint_profile_translator_test.go b/controller/api/destination/endpoint_profile_translator_test.go index f15fd8baa9028..3a4112f10f0c3 100644 --- a/controller/api/destination/endpoint_profile_translator_test.go +++ b/controller/api/destination/endpoint_profile_translator_test.go @@ -39,6 +39,7 @@ func TestEndpointProfileTranslator(t *testing.T) { log := logging.WithField("test", t.Name()) translator := newEndpointProfileTranslator( true, "cluster", "identity", make(map[uint32]struct{}), + nil, nil, mockGetProfileServer, nil, log, @@ -83,6 +84,7 @@ func TestEndpointProfileTranslator(t *testing.T) { endStream := make(chan struct{}) translator := newEndpointProfileTranslator( true, "cluster", "identity", make(map[uint32]struct{}), + nil, nil, mockGetProfileServer, endStream, log, diff --git a/jaeger/charts/linkerd-jaeger/Chart.yaml b/jaeger/charts/linkerd-jaeger/Chart.yaml index 30bf3a83e3bc4..f78aa90b20314 100644 --- a/jaeger/charts/linkerd-jaeger/Chart.yaml +++ b/jaeger/charts/linkerd-jaeger/Chart.yaml @@ -11,7 +11,7 @@ kubeVersion: ">=1.21.0-0" name: linkerd-jaeger sources: - https://github.com/linkerd/linkerd2/ -version: 30.12.8 +version: 30.12.9 icon: https://linkerd.io/images/logo-only-200h.png maintainers: - name: Linkerd authors diff --git a/jaeger/charts/linkerd-jaeger/README.md b/jaeger/charts/linkerd-jaeger/README.md index 995b53cab6f5c..af67bcee16b71 100644 --- a/jaeger/charts/linkerd-jaeger/README.md +++ b/jaeger/charts/linkerd-jaeger/README.md @@ -3,7 +3,7 @@ The Linkerd-Jaeger extension adds distributed tracing to Linkerd using OpenCensus and Jaeger. -![Version: 30.12.8](https://img.shields.io/badge/Version-30.12.8-informational?style=flat-square) +![Version: 30.12.9](https://img.shields.io/badge/Version-30.12.9-informational?style=flat-square) ![AppVersion: edge-XX.X.X](https://img.shields.io/badge/AppVersion-edge--XX.X.X-informational?style=flat-square) diff --git a/multicluster/charts/linkerd-multicluster/Chart.yaml b/multicluster/charts/linkerd-multicluster/Chart.yaml index 186a623549c3e..c8bad2f85f0e7 100644 --- a/multicluster/charts/linkerd-multicluster/Chart.yaml +++ b/multicluster/charts/linkerd-multicluster/Chart.yaml @@ -11,7 +11,7 @@ kubeVersion: ">=1.21.0-0" name: "linkerd-multicluster" sources: - https://github.com/linkerd/linkerd2/ -version: 30.11.8 +version: 30.11.9 icon: https://linkerd.io/images/logo-only-200h.png maintainers: - name: Linkerd authors diff --git a/multicluster/charts/linkerd-multicluster/README.md b/multicluster/charts/linkerd-multicluster/README.md index 21572a231fb1a..2a4343bc6a483 100644 --- a/multicluster/charts/linkerd-multicluster/README.md +++ b/multicluster/charts/linkerd-multicluster/README.md @@ -3,7 +3,7 @@ The Linkerd-Multicluster extension contains resources to support multicluster linking to remote clusters -![Version: 30.11.8](https://img.shields.io/badge/Version-30.11.8-informational?style=flat-square) +![Version: 30.11.9](https://img.shields.io/badge/Version-30.11.9-informational?style=flat-square) ![AppVersion: edge-XX.X.X](https://img.shields.io/badge/AppVersion-edge--XX.X.X-informational?style=flat-square) diff --git a/viz/charts/linkerd-viz/Chart.yaml b/viz/charts/linkerd-viz/Chart.yaml index 37fdfcb617bf1..f86dacd107002 100644 --- a/viz/charts/linkerd-viz/Chart.yaml +++ b/viz/charts/linkerd-viz/Chart.yaml @@ -11,7 +11,7 @@ kubeVersion: ">=1.21.0-0" name: "linkerd-viz" sources: - https://github.com/linkerd/linkerd2/ -version: 30.12.8 +version: 30.12.9 icon: https://linkerd.io/images/logo-only-200h.png maintainers: - name: Linkerd authors diff --git a/viz/charts/linkerd-viz/README.md b/viz/charts/linkerd-viz/README.md index a4f886b79f2d3..c0648a1b7a737 100644 --- a/viz/charts/linkerd-viz/README.md +++ b/viz/charts/linkerd-viz/README.md @@ -3,7 +3,7 @@ The Linkerd-Viz extension contains observability and visualization components for Linkerd. -![Version: 30.12.8](https://img.shields.io/badge/Version-30.12.8-informational?style=flat-square) +![Version: 30.12.9](https://img.shields.io/badge/Version-30.12.9-informational?style=flat-square) ![AppVersion: edge-XX.X.X](https://img.shields.io/badge/AppVersion-edge--XX.X.X-informational?style=flat-square)