diff --git a/CHANGES.md b/CHANGES.md index 83937f15a0d54..17ba7b17e4327 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## stable-2.14.8 + +This stable release fixes an issue in the control plane where discovery for pod +IP addresses could hang indefinitely ([#11815]). + +[#11815]: https://github.com/linkerd/linkerd2/pull/11815 + ## stable-2.14.7 This stable release fixes two bugs in the Linkerd control plane. diff --git a/charts/linkerd-control-plane/Chart.yaml b/charts/linkerd-control-plane/Chart.yaml index 47d4485253143..f7fd02fe30091 100644 --- a/charts/linkerd-control-plane/Chart.yaml +++ b/charts/linkerd-control-plane/Chart.yaml @@ -16,7 +16,7 @@ dependencies: - name: partials version: 0.1.0 repository: file://../partials -version: 1.16.8 +version: 1.16.9 icon: https://linkerd.io/images/logo-only-200h.png maintainers: - name: Linkerd authors diff --git a/charts/linkerd-control-plane/README.md b/charts/linkerd-control-plane/README.md index 04e347d3b7b59..8658a541756ad 100644 --- a/charts/linkerd-control-plane/README.md +++ b/charts/linkerd-control-plane/README.md @@ -3,7 +3,7 @@ Linkerd gives you observability, reliability, and security for your microservices — with no code change required. -![Version: 1.16.8](https://img.shields.io/badge/Version-1.16.8-informational?style=flat-square) +![Version: 1.16.9](https://img.shields.io/badge/Version-1.16.9-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: edge-XX.X.X](https://img.shields.io/badge/AppVersion-edge--XX.X.X-informational?style=flat-square) diff --git a/controller/api/destination/endpoint_profile_translator.go b/controller/api/destination/endpoint_profile_translator.go index 8ca312b1563d8..bef7f47050cf2 100644 --- a/controller/api/destination/endpoint_profile_translator.go +++ b/controller/api/destination/endpoint_profile_translator.go @@ -6,7 +6,10 @@ import ( pb "github.com/linkerd/linkerd2-proxy-api/go/destination" "github.com/linkerd/linkerd2/controller/api/destination/watcher" "github.com/linkerd/linkerd2/controller/k8s" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" logging "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" ) type endpointProfileTranslator struct { @@ -14,25 +17,43 @@ type endpointProfileTranslator struct { controllerNS string identityTrustDomain string defaultOpaquePorts map[uint32]struct{} - stream pb.Destination_GetProfileServer - lastMessage string + + stream pb.Destination_GetProfileServer + endStream chan struct{} + + updates chan *watcher.Address + stop chan struct{} + + current *pb.DestinationProfile k8sAPI *k8s.API metadataAPI *k8s.MetadataAPI log *logging.Entry } -// newEndpointProfileTranslator translates pod updates and protocol updates to +// endpointProfileUpdatesQueueOverflowCounter is a prometheus counter that is incremented +// whenever the profile updates queue overflows. +// +// We omit ip and port labels because they are high cardinality. +var endpointProfileUpdatesQueueOverflowCounter = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "endpoint_profile_updates_queue_overflow", + Help: "A counter incremented whenever the endpoint profile updates queue overflows", + }, +) + +// newEndpointProfileTranslator translates pod updates and profile updates to // DestinationProfiles for endpoints func newEndpointProfileTranslator( enableH2Upgrade bool, controllerNS, identityTrustDomain string, defaultOpaquePorts map[uint32]struct{}, - log *logging.Entry, - stream pb.Destination_GetProfileServer, k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, + stream pb.Destination_GetProfileServer, + endStream chan struct{}, + log *logging.Entry, ) *endpointProfileTranslator { return &endpointProfileTranslator{ enableH2Upgrade: enableH2Upgrade, @@ -40,19 +61,67 @@ func newEndpointProfileTranslator( identityTrustDomain: identityTrustDomain, defaultOpaquePorts: defaultOpaquePorts, stream: stream, + endStream: endStream, + updates: make(chan *watcher.Address, updateQueueCapacity), + stop: make(chan struct{}), k8sAPI: k8sAPI, metadataAPI: metadataAPI, log: log.WithField("component", "endpoint-profile-translator"), } } -// Update sends a DestinationProfile message into the stream, if the same -// message hasn't been sent already. If it has, false is returned. -func (ept *endpointProfileTranslator) Update(address *watcher.Address) (bool, error) { +// Start initiates a goroutine which processes update events off of the +// endpointProfileTranslator's internal queue and sends to the grpc stream as +// appropriate. The goroutine calls non-thread-safe Send, therefore Start must +// not be called more than once. +func (ept *endpointProfileTranslator) Start() { + go func() { + for { + select { + case update := <-ept.updates: + ept.update(update) + case <-ept.stop: + return + } + } + }() +} + +// Stop terminates the goroutine started by Start. +func (ept *endpointProfileTranslator) Stop() { + close(ept.stop) +} + +// Update enqueues an address update to be translated into a DestinationProfile. +// An error is returned if the update cannot be enqueued. +func (ept *endpointProfileTranslator) Update(address *watcher.Address) error { + select { + case ept.updates <- address: + // Update has been successfully enqueued. + return nil + default: + select { + case <-ept.endStream: + // The endStream channel has already been closed so no action is + // necessary. + return fmt.Errorf("profile update stream closed") + default: + // We are unable to enqueue because the channel does not have capacity. + // The stream has fallen too far behind and should be closed. + endpointProfileUpdatesQueueOverflowCounter.Inc() + close(ept.endStream) + return fmt.Errorf("profile update queue full; aborting stream") + } + } +} + +func (ept *endpointProfileTranslator) update(address *watcher.Address) { opaquePorts := watcher.GetAnnotatedOpaquePorts(address.Pod, ept.defaultOpaquePorts) endpoint, err := ept.createEndpoint(*address, opaquePorts) if err != nil { - return false, fmt.Errorf("failed to create endpoint: %w", err) + ept.log.Errorf("Failed to create endpoint for %s:%d: %s", + address.IP, address.Port, err) + return } // The protocol for an endpoint should only be updated if there is a pod, @@ -79,17 +148,18 @@ func (ept *endpointProfileTranslator) Update(address *watcher.Address) (bool, er Endpoint: endpoint, OpaqueProtocol: address.OpaqueProtocol, } - msg := profile.String() - if msg == ept.lastMessage { - return false, nil + if proto.Equal(profile, ept.current) { + ept.log.Debugf("Ignoring redundant profile update: %+v", profile) + return } - ept.lastMessage = msg - ept.log.Debugf("sending protocol update: %+v", profile) + + ept.log.Debugf("Sending profile update: %+v", profile) if err := ept.stream.Send(profile); err != nil { - return false, fmt.Errorf("failed to send protocol update: %w", err) + ept.log.Errorf("failed to send profile update: %s", err) + return } - return true, nil + ept.current = profile } func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, opaquePorts map[uint32]struct{}) (*pb.WeightedAddr, error) { diff --git a/controller/api/destination/endpoint_profile_translator_test.go b/controller/api/destination/endpoint_profile_translator_test.go new file mode 100644 index 0000000000000..3a4112f10f0c3 --- /dev/null +++ b/controller/api/destination/endpoint_profile_translator_test.go @@ -0,0 +1,127 @@ +package destination + +import ( + "testing" + "time" + + pb "github.com/linkerd/linkerd2-proxy-api/go/destination" + "github.com/linkerd/linkerd2/controller/api/destination/watcher" + consts "github.com/linkerd/linkerd2/pkg/k8s" + logging "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestEndpointProfileTranslator(t *testing.T) { + // logging.SetLevel(logging.TraceLevel) + // defer logging.SetLevel(logging.PanicLevel) + + addr := &watcher.Address{ + IP: "10.10.11.11", + Port: 8080, + } + podAddr := &watcher.Address{ + IP: "10.10.11.11", + Port: 8080, + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + consts.ProxyOpaquePortsAnnotation: "8080", + }, + }, + }, + } + + t.Run("Sends update", func(t *testing.T) { + mockGetProfileServer := &mockDestinationGetProfileServer{ + profilesReceived: make(chan *pb.DestinationProfile, 1), + } + log := logging.WithField("test", t.Name()) + translator := newEndpointProfileTranslator( + true, "cluster", "identity", make(map[uint32]struct{}), + nil, nil, + mockGetProfileServer, + nil, + log, + ) + translator.Start() + defer translator.Stop() + + if err := translator.Update(addr); err != nil { + t.Fatal("Expected update") + } + select { + case p := <-mockGetProfileServer.profilesReceived: + log.Debugf("Received update: %v", p) + case <-time.After(1 * time.Second): + t.Fatal("No update received") + } + + if err := translator.Update(addr); err != nil { + t.Fatal("Unexpected update") + } + select { + case p := <-mockGetProfileServer.profilesReceived: + t.Fatalf("Duplicate update sent: %v", p) + case <-time.After(1 * time.Second): + } + + if err := translator.Update(podAddr); err != nil { + t.Fatal("Expected update") + } + select { + case p := <-mockGetProfileServer.profilesReceived: + log.Debugf("Received update: %v", p) + case <-time.After(1 * time.Second): + } + }) + + t.Run("Handles overflow", func(t *testing.T) { + mockGetProfileServer := &mockDestinationGetProfileServer{ + profilesReceived: make(chan *pb.DestinationProfile, 1), + } + log := logging.WithField("test", t.Name()) + endStream := make(chan struct{}) + translator := newEndpointProfileTranslator( + true, "cluster", "identity", make(map[uint32]struct{}), + nil, nil, + mockGetProfileServer, + endStream, + log, + ) + translator.Start() + defer translator.Stop() + + for i := 0; i < updateQueueCapacity/2; i++ { + if err := translator.Update(podAddr); err != nil { + t.Fatal("Expected update") + } + select { + case <-endStream: + t.Fatal("Stream ended prematurely") + default: + } + + if err := translator.Update(addr); err != nil { + t.Fatal("Expected update") + } + select { + case <-endStream: + t.Fatal("Stream ended prematurely") + default: + } + } + + if err := translator.Update(podAddr); err == nil { + t.Fatal("Expected update to fail") + } + select { + case <-endStream: + default: + t.Fatal("Stream should have ended") + } + + // XXX We should assert that endpointProfileUpdatesQueueOverflowCounter + // == 1 but we can't read counter values. + }) +} diff --git a/controller/api/destination/server.go b/controller/api/destination/server.go index 4fadda88a6bce..ab60eef1d3323 100644 --- a/controller/api/destination/server.go +++ b/controller/api/destination/server.go @@ -484,16 +484,21 @@ func (s *server) subscribeToEndpointProfile( log *logging.Entry, stream pb.Destination_GetProfileServer, ) error { + canceled := stream.Context().Done() + streamEnd := make(chan struct{}) translator := newEndpointProfileTranslator( s.enableH2Upgrade, s.controllerNS, s.identityTrustDomain, s.defaultOpaquePorts, - log, - stream, s.k8sAPI, s.metadataAPI, + stream, + streamEnd, + log, ) + translator.Start() + defer translator.Stop() var err error ip, err = s.pods.Subscribe(service, hostname, ip, port, translator) @@ -504,8 +509,10 @@ func (s *server) subscribeToEndpointProfile( select { case <-s.shutdown: - case <-stream.Context().Done(): + case <-canceled: s.log.Debugf("Cancelled") + case <-streamEnd: + log.Errorf("GetProfile %s:%d stream aborted", ip, port) } return nil } diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index df79bab9a8249..73961b001fdc0 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -460,7 +460,7 @@ spec: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } log := logging.WithField("test", t.Name()) - logging.SetLevel(logging.TraceLevel) + // logging.SetLevel(logging.TraceLevel) defaultOpaquePorts := map[uint32]struct{}{ 25: {}, 443: {}, diff --git a/controller/api/destination/watcher/pod_watcher.go b/controller/api/destination/watcher/pod_watcher.go index ad19b9f2da4d0..ae9090e578a19 100644 --- a/controller/api/destination/watcher/pod_watcher.go +++ b/controller/api/destination/watcher/pod_watcher.go @@ -53,7 +53,7 @@ type ( // PodUpdateListener is the interface subscribers must implement. PodUpdateListener interface { - Update(*Address) (bool, error) + Update(*Address) error } ) @@ -115,13 +115,10 @@ func (pw *PodWatcher) Subscribe(service *ServiceID, hostname, ip string, port Po return "", err } - sent, err := listener.Update(&address) - if err != nil { - return "", err - } - if sent { - pp.metrics.incUpdates() + if err = listener.Update(&address); err != nil { + return "", fmt.Errorf("failed to send initial update: %w", err) } + pp.metrics.incUpdates() return pp.ip, nil } @@ -261,11 +258,11 @@ func (pw *PodWatcher) updateServers(_ any) { pw.log.Errorf("Error creating address for pod: %s", err) continue } - sent, err := listener.Update(&addr) - if err != nil { - pw.log.Errorf("Error calling pod watcher listener for server update: %s", err) + if err = listener.Update(&addr); err != nil { + pw.log.Warnf("Error sending update to listener: %s", err) + continue } - updated = updated || sent + updated = true } if updated { pp.metrics.incUpdates() @@ -464,11 +461,11 @@ func (pp *podPublisher) updatePod(pod *corev1.Pod) { pp.log.Errorf("Error creating address for pod: %s", err) continue } - sent, err := l.Update(&addr) - if err != nil { - pp.log.Errorf("Error calling pod watcher listener for pod update: %s", err) + if err = l.Update(&addr); err != nil { + pp.log.Warnf("Error sending update to listener: %s", err) + continue } - updated = updated || sent + updated = true } if updated { pp.metrics.incUpdates() @@ -487,11 +484,11 @@ func (pp *podPublisher) updatePod(pod *corev1.Pod) { pp.log.Errorf("Error creating address for pod: %s", err) continue } - sent, err := l.Update(&addr) - if err != nil { - pp.log.Errorf("Error calling pod watcher listener for pod deletion: %s", err) + if err = l.Update(&addr); err != nil { + pp.log.Warnf("Error sending update to listener: %s", err) + continue } - updated = updated || sent + updated = true } if updated { pp.metrics.incUpdates() diff --git a/jaeger/charts/linkerd-jaeger/Chart.yaml b/jaeger/charts/linkerd-jaeger/Chart.yaml index 30bf3a83e3bc4..f78aa90b20314 100644 --- a/jaeger/charts/linkerd-jaeger/Chart.yaml +++ b/jaeger/charts/linkerd-jaeger/Chart.yaml @@ -11,7 +11,7 @@ kubeVersion: ">=1.21.0-0" name: linkerd-jaeger sources: - https://github.com/linkerd/linkerd2/ -version: 30.12.8 +version: 30.12.9 icon: https://linkerd.io/images/logo-only-200h.png maintainers: - name: Linkerd authors diff --git a/jaeger/charts/linkerd-jaeger/README.md b/jaeger/charts/linkerd-jaeger/README.md index 995b53cab6f5c..af67bcee16b71 100644 --- a/jaeger/charts/linkerd-jaeger/README.md +++ b/jaeger/charts/linkerd-jaeger/README.md @@ -3,7 +3,7 @@ The Linkerd-Jaeger extension adds distributed tracing to Linkerd using OpenCensus and Jaeger. -![Version: 30.12.8](https://img.shields.io/badge/Version-30.12.8-informational?style=flat-square) +![Version: 30.12.9](https://img.shields.io/badge/Version-30.12.9-informational?style=flat-square) ![AppVersion: edge-XX.X.X](https://img.shields.io/badge/AppVersion-edge--XX.X.X-informational?style=flat-square) diff --git a/multicluster/charts/linkerd-multicluster/Chart.yaml b/multicluster/charts/linkerd-multicluster/Chart.yaml index 186a623549c3e..c8bad2f85f0e7 100644 --- a/multicluster/charts/linkerd-multicluster/Chart.yaml +++ b/multicluster/charts/linkerd-multicluster/Chart.yaml @@ -11,7 +11,7 @@ kubeVersion: ">=1.21.0-0" name: "linkerd-multicluster" sources: - https://github.com/linkerd/linkerd2/ -version: 30.11.8 +version: 30.11.9 icon: https://linkerd.io/images/logo-only-200h.png maintainers: - name: Linkerd authors diff --git a/multicluster/charts/linkerd-multicluster/README.md b/multicluster/charts/linkerd-multicluster/README.md index 21572a231fb1a..2a4343bc6a483 100644 --- a/multicluster/charts/linkerd-multicluster/README.md +++ b/multicluster/charts/linkerd-multicluster/README.md @@ -3,7 +3,7 @@ The Linkerd-Multicluster extension contains resources to support multicluster linking to remote clusters -![Version: 30.11.8](https://img.shields.io/badge/Version-30.11.8-informational?style=flat-square) +![Version: 30.11.9](https://img.shields.io/badge/Version-30.11.9-informational?style=flat-square) ![AppVersion: edge-XX.X.X](https://img.shields.io/badge/AppVersion-edge--XX.X.X-informational?style=flat-square) diff --git a/viz/charts/linkerd-viz/Chart.yaml b/viz/charts/linkerd-viz/Chart.yaml index 37fdfcb617bf1..f86dacd107002 100644 --- a/viz/charts/linkerd-viz/Chart.yaml +++ b/viz/charts/linkerd-viz/Chart.yaml @@ -11,7 +11,7 @@ kubeVersion: ">=1.21.0-0" name: "linkerd-viz" sources: - https://github.com/linkerd/linkerd2/ -version: 30.12.8 +version: 30.12.9 icon: https://linkerd.io/images/logo-only-200h.png maintainers: - name: Linkerd authors diff --git a/viz/charts/linkerd-viz/README.md b/viz/charts/linkerd-viz/README.md index a4f886b79f2d3..c0648a1b7a737 100644 --- a/viz/charts/linkerd-viz/README.md +++ b/viz/charts/linkerd-viz/README.md @@ -3,7 +3,7 @@ The Linkerd-Viz extension contains observability and visualization components for Linkerd. -![Version: 30.12.8](https://img.shields.io/badge/Version-30.12.8-informational?style=flat-square) +![Version: 30.12.9](https://img.shields.io/badge/Version-30.12.9-informational?style=flat-square) ![AppVersion: edge-XX.X.X](https://img.shields.io/badge/AppVersion-edge--XX.X.X-informational?style=flat-square)