Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stable-2.14.8 change notes #11870

Merged
merged 2 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changes

## stable-2.14.8

This stable release fixes an issue in the control plane where discovery for pod
IP addresses could hang indefinitely ([#11815]).

[#11815]: https://github.com/linkerd/linkerd2/pull/11815

## stable-2.14.7

This stable release fixes two bugs in the Linkerd control plane.
Expand Down
2 changes: 1 addition & 1 deletion charts/linkerd-control-plane/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies:
- name: partials
version: 0.1.0
repository: file://../partials
version: 1.16.8
version: 1.16.9
icon: https://linkerd.io/images/logo-only-200h.png
maintainers:
- name: Linkerd authors
Expand Down
2 changes: 1 addition & 1 deletion charts/linkerd-control-plane/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Linkerd gives you observability, reliability, and security
for your microservices — with no code change required.

![Version: 1.16.8](https://img.shields.io/badge/Version-1.16.8-informational?style=flat-square)
![Version: 1.16.9](https://img.shields.io/badge/Version-1.16.9-informational?style=flat-square)
![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square)
![AppVersion: edge-XX.X.X](https://img.shields.io/badge/AppVersion-edge--XX.X.X-informational?style=flat-square)

Expand Down
102 changes: 86 additions & 16 deletions controller/api/destination/endpoint_profile_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,122 @@ import (
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
logging "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

type endpointProfileTranslator struct {
enableH2Upgrade bool
controllerNS string
identityTrustDomain string
defaultOpaquePorts map[uint32]struct{}
stream pb.Destination_GetProfileServer
lastMessage string

stream pb.Destination_GetProfileServer
endStream chan struct{}

updates chan *watcher.Address
stop chan struct{}

current *pb.DestinationProfile

k8sAPI *k8s.API
metadataAPI *k8s.MetadataAPI
log *logging.Entry
}

// newEndpointProfileTranslator translates pod updates and protocol updates to
// endpointProfileUpdatesQueueOverflowCounter is a prometheus counter that is incremented
// whenever the profile updates queue overflows.
//
// We omit ip and port labels because they are high cardinality.
var endpointProfileUpdatesQueueOverflowCounter = promauto.NewCounter(
prometheus.CounterOpts{
Name: "endpoint_profile_updates_queue_overflow",
Help: "A counter incremented whenever the endpoint profile updates queue overflows",
},
)

// newEndpointProfileTranslator translates pod updates and profile updates to
// DestinationProfiles for endpoints
func newEndpointProfileTranslator(
enableH2Upgrade bool,
controllerNS,
identityTrustDomain string,
defaultOpaquePorts map[uint32]struct{},
log *logging.Entry,
stream pb.Destination_GetProfileServer,
k8sAPI *k8s.API,
metadataAPI *k8s.MetadataAPI,
stream pb.Destination_GetProfileServer,
endStream chan struct{},
log *logging.Entry,
) *endpointProfileTranslator {
return &endpointProfileTranslator{
enableH2Upgrade: enableH2Upgrade,
controllerNS: controllerNS,
identityTrustDomain: identityTrustDomain,
defaultOpaquePorts: defaultOpaquePorts,
stream: stream,
endStream: endStream,
updates: make(chan *watcher.Address, updateQueueCapacity),
stop: make(chan struct{}),
k8sAPI: k8sAPI,
metadataAPI: metadataAPI,
log: log.WithField("component", "endpoint-profile-translator"),
}
}

// Update sends a DestinationProfile message into the stream, if the same
// message hasn't been sent already. If it has, false is returned.
func (ept *endpointProfileTranslator) Update(address *watcher.Address) (bool, error) {
// Start initiates a goroutine which processes update events off of the
// endpointProfileTranslator's internal queue and sends to the grpc stream as
// appropriate. The goroutine calls non-thread-safe Send, therefore Start must
// not be called more than once.
func (ept *endpointProfileTranslator) Start() {
go func() {
for {
select {
case update := <-ept.updates:
ept.update(update)
case <-ept.stop:
return
}
}
}()
}

// Stop terminates the goroutine started by Start.
func (ept *endpointProfileTranslator) Stop() {
close(ept.stop)
}

// Update enqueues an address update to be translated into a DestinationProfile.
// An error is returned if the update cannot be enqueued.
func (ept *endpointProfileTranslator) Update(address *watcher.Address) error {
select {
case ept.updates <- address:
// Update has been successfully enqueued.
return nil
default:
select {
case <-ept.endStream:
// The endStream channel has already been closed so no action is
// necessary.
return fmt.Errorf("profile update stream closed")
default:
// We are unable to enqueue because the channel does not have capacity.
// The stream has fallen too far behind and should be closed.
endpointProfileUpdatesQueueOverflowCounter.Inc()
close(ept.endStream)
return fmt.Errorf("profile update queue full; aborting stream")
}
}
}

func (ept *endpointProfileTranslator) update(address *watcher.Address) {
opaquePorts := watcher.GetAnnotatedOpaquePorts(address.Pod, ept.defaultOpaquePorts)
endpoint, err := ept.createEndpoint(*address, opaquePorts)
if err != nil {
return false, fmt.Errorf("failed to create endpoint: %w", err)
ept.log.Errorf("Failed to create endpoint for %s:%d: %s",
address.IP, address.Port, err)
return
}

// The protocol for an endpoint should only be updated if there is a pod,
Expand All @@ -79,17 +148,18 @@ func (ept *endpointProfileTranslator) Update(address *watcher.Address) (bool, er
Endpoint: endpoint,
OpaqueProtocol: address.OpaqueProtocol,
}
msg := profile.String()
if msg == ept.lastMessage {
return false, nil
if proto.Equal(profile, ept.current) {
ept.log.Debugf("Ignoring redundant profile update: %+v", profile)
return
}
ept.lastMessage = msg
ept.log.Debugf("sending protocol update: %+v", profile)

ept.log.Debugf("Sending profile update: %+v", profile)
if err := ept.stream.Send(profile); err != nil {
return false, fmt.Errorf("failed to send protocol update: %w", err)
ept.log.Errorf("failed to send profile update: %s", err)
return
}

return true, nil
ept.current = profile
}

func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, opaquePorts map[uint32]struct{}) (*pb.WeightedAddr, error) {
Expand Down
127 changes: 127 additions & 0 deletions controller/api/destination/endpoint_profile_translator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package destination

import (
"testing"
"time"

pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
consts "github.com/linkerd/linkerd2/pkg/k8s"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestEndpointProfileTranslator(t *testing.T) {
// logging.SetLevel(logging.TraceLevel)
// defer logging.SetLevel(logging.PanicLevel)

addr := &watcher.Address{
IP: "10.10.11.11",
Port: 8080,
}
podAddr := &watcher.Address{
IP: "10.10.11.11",
Port: 8080,
Pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
consts.ProxyOpaquePortsAnnotation: "8080",
},
},
},
}

t.Run("Sends update", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{
profilesReceived: make(chan *pb.DestinationProfile, 1),
}
log := logging.WithField("test", t.Name())
translator := newEndpointProfileTranslator(
true, "cluster", "identity", make(map[uint32]struct{}),
nil, nil,
mockGetProfileServer,
nil,
log,
)
translator.Start()
defer translator.Stop()

if err := translator.Update(addr); err != nil {
t.Fatal("Expected update")
}
select {
case p := <-mockGetProfileServer.profilesReceived:
log.Debugf("Received update: %v", p)
case <-time.After(1 * time.Second):
t.Fatal("No update received")
}

if err := translator.Update(addr); err != nil {
t.Fatal("Unexpected update")
}
select {
case p := <-mockGetProfileServer.profilesReceived:
t.Fatalf("Duplicate update sent: %v", p)
case <-time.After(1 * time.Second):
}

if err := translator.Update(podAddr); err != nil {
t.Fatal("Expected update")
}
select {
case p := <-mockGetProfileServer.profilesReceived:
log.Debugf("Received update: %v", p)
case <-time.After(1 * time.Second):
}
})

t.Run("Handles overflow", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{
profilesReceived: make(chan *pb.DestinationProfile, 1),
}
log := logging.WithField("test", t.Name())
endStream := make(chan struct{})
translator := newEndpointProfileTranslator(
true, "cluster", "identity", make(map[uint32]struct{}),
nil, nil,
mockGetProfileServer,
endStream,
log,
)
translator.Start()
defer translator.Stop()

for i := 0; i < updateQueueCapacity/2; i++ {
if err := translator.Update(podAddr); err != nil {
t.Fatal("Expected update")
}
select {
case <-endStream:
t.Fatal("Stream ended prematurely")
default:
}

if err := translator.Update(addr); err != nil {
t.Fatal("Expected update")
}
select {
case <-endStream:
t.Fatal("Stream ended prematurely")
default:
}
}

if err := translator.Update(podAddr); err == nil {
t.Fatal("Expected update to fail")
}
select {
case <-endStream:
default:
t.Fatal("Stream should have ended")
}

// XXX We should assert that endpointProfileUpdatesQueueOverflowCounter
// == 1 but we can't read counter values.
})
}
13 changes: 10 additions & 3 deletions controller/api/destination/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,16 +484,21 @@ func (s *server) subscribeToEndpointProfile(
log *logging.Entry,
stream pb.Destination_GetProfileServer,
) error {
canceled := stream.Context().Done()
streamEnd := make(chan struct{})
translator := newEndpointProfileTranslator(
s.enableH2Upgrade,
s.controllerNS,
s.identityTrustDomain,
s.defaultOpaquePorts,
log,
stream,
s.k8sAPI,
s.metadataAPI,
stream,
streamEnd,
log,
)
translator.Start()
defer translator.Stop()

var err error
ip, err = s.pods.Subscribe(service, hostname, ip, port, translator)
Expand All @@ -504,8 +509,10 @@ func (s *server) subscribeToEndpointProfile(

select {
case <-s.shutdown:
case <-stream.Context().Done():
case <-canceled:
s.log.Debugf("Cancelled")
case <-streamEnd:
log.Errorf("GetProfile %s:%d stream aborted", ip, port)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ spec:
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
}
log := logging.WithField("test", t.Name())
logging.SetLevel(logging.TraceLevel)
// logging.SetLevel(logging.TraceLevel)
defaultOpaquePorts := map[uint32]struct{}{
25: {},
443: {},
Expand Down
Loading
Loading