Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add update queue to endpoint translator #11491

Merged
merged 7 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion controller/api/destination/destination_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func FuzzAdd(data []byte) int {
}
t := &testing.T{}
_, translator := makeEndpointTranslator(t)
translator.Start()
defer translator.Stop()
translator.Add(set)
translator.Remove(set)
return 1
Expand Down Expand Up @@ -52,7 +54,7 @@ func FuzzGet(data []byte) int {
server := makeServer(t)

stream := &bufferingGetStream{
updates: []*pb.Update{},
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
}
_ = server.Get(dest1, stream)
Expand Down
186 changes: 142 additions & 44 deletions controller/api/destination/endpoint_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"reflect"
"strconv"
"strings"
"sync"

pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2-proxy-api/go/net"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/addr"
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
)
Expand All @@ -22,26 +23,55 @@ const (
// inboundListenAddr is the environment variable holding the inbound
// listening address for the proxy container.
envInboundListenAddr = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR"

updateQueueCapacity = 100
)

// endpointTranslator satisfies EndpointUpdateListener and translates updates
// into Destination.Get messages.
type endpointTranslator struct {
controllerNS string
identityTrustDomain string
enableH2Upgrade bool
nodeTopologyZone string
nodeName string
defaultOpaquePorts map[uint32]struct{}
enableEndpointFiltering bool

availableEndpoints watcher.AddressSet
filteredSnapshot watcher.AddressSet
stream pb.Destination_GetServer
log *logging.Entry

mu sync.Mutex
}
type (
endpointTranslator struct {
controllerNS string
identityTrustDomain string
enableH2Upgrade bool
nodeTopologyZone string
nodeName string
defaultOpaquePorts map[uint32]struct{}
enableEndpointFiltering bool

availableEndpoints watcher.AddressSet
filteredSnapshot watcher.AddressSet
stream pb.Destination_GetServer
endStream chan struct{}
log *logging.Entry
overflowCounter prometheus.Counter

updates chan interface{}
stop chan struct{}
}

addUpdate struct {
set watcher.AddressSet
}

removeUpdate struct {
set watcher.AddressSet
}

noEndpointsUpdate struct {
exists bool
}
)

var updatesQueueOverflowCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "endpoint_updates_queue_overflow",
Help: "A counter incremented whenever the endpoint updates queue overflows",
},
[]string{
"service",
},
)

func newEndpointTranslator(
controllerNS string,
Expand All @@ -53,6 +83,7 @@ func newEndpointTranslator(
enableEndpointFiltering bool,
k8sAPI *k8s.MetadataAPI,
stream pb.Destination_GetServer,
endStream chan struct{},
log *logging.Entry,
) *endpointTranslator {
log = log.WithFields(logging.Fields{
Expand All @@ -79,33 +110,120 @@ func newEndpointTranslator(
availableEndpoints,
filteredSnapshot,
stream,
endStream,
log,
sync.Mutex{},
updatesQueueOverflowCounter.With(prometheus.Labels{"service": service}),
make(chan interface{}, updateQueueCapacity),
make(chan struct{}),
}
}

func (et *endpointTranslator) Add(set watcher.AddressSet) {
et.mu.Lock()
defer et.mu.Unlock()
et.enqueueUpdate(&addUpdate{set})
}

func (et *endpointTranslator) Remove(set watcher.AddressSet) {
et.enqueueUpdate(&removeUpdate{set})
}

func (et *endpointTranslator) NoEndpoints(exists bool) {
et.enqueueUpdate(&noEndpointsUpdate{exists})
}

// Add, Remove, and NoEndpoints are called from a client-go informer callback
// and therefore must not block. For each of these, we enqueue an update in
// a channel so that it can be processed asyncronously. To ensure that enqueuing
// does not block, we first check to see if there is capacity in the buffered
// channel. If there is not, we drop the update and signal to the stream that
// it has fallen too far behind and should be closed.
func (et *endpointTranslator) enqueueUpdate(update interface{}) {
select {
case et.updates <- update:
// Update has been successfully enqueued.
default:
// We are unable to enqueue because the channel does not have capacity.
// The stream has fallen too far behind and should be closed.
et.overflowCounter.Inc()
mateiidavid marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-et.endStream:
// The endStream channel has already been closed so no action is
// necessary.
default:
et.log.Error("endpoint update queue full; aborting stream")
close(et.endStream)
}
}
}

// Start initiates a goroutine which processes update events off of the
// endpointTranslator's internal queue and sends to the grpc stream as
// appropriate. The goroutine calls several non-thread-safe functions (including
// Send) and therefore, Start must not be called more than once.
func (et *endpointTranslator) Start() {
go func() {
for {
select {
case update := <-et.updates:
et.processUpdate(update)
case <-et.stop:
return
}
}
}()
}

// Stop terminates the goroutine started by Start.
func (et *endpointTranslator) Stop() {
close(et.stop)
}

func (et *endpointTranslator) processUpdate(update interface{}) {
switch update := update.(type) {
case *addUpdate:
et.add(update.set)
case *removeUpdate:
et.remove(update.set)
case *noEndpointsUpdate:
et.noEndpoints(update.exists)
}
}

func (et *endpointTranslator) add(set watcher.AddressSet) {
for id, address := range set.Addresses {
et.availableEndpoints.Addresses[id] = address
}

et.sendFilteredUpdate(set)
}

func (et *endpointTranslator) Remove(set watcher.AddressSet) {
et.mu.Lock()
defer et.mu.Unlock()

func (et *endpointTranslator) remove(set watcher.AddressSet) {
for id := range set.Addresses {
delete(et.availableEndpoints.Addresses, id)
}

et.sendFilteredUpdate(set)
}

func (et *endpointTranslator) noEndpoints(exists bool) {
et.log.Debugf("NoEndpoints(%+v)", exists)

et.availableEndpoints.Addresses = map[watcher.ID]watcher.Address{}
et.filteredSnapshot.Addresses = map[watcher.ID]watcher.Address{}

u := &pb.Update{
Update: &pb.Update_NoEndpoints{
NoEndpoints: &pb.NoEndpoints{
Exists: exists,
},
},
}

et.log.Debugf("Sending destination no endpoints: %+v", u)
if err := et.stream.Send(u); err != nil {
et.log.Debugf("Failed to send address update: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is previous code that we've moved, but why would an error would be logged at Debug level?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect to see this every time a client proxy restarts/closes a stream.

}
}

func (et *endpointTranslator) sendFilteredUpdate(set watcher.AddressSet) {
et.availableEndpoints = watcher.AddressSet{
Addresses: et.availableEndpoints.Addresses,
Expand Down Expand Up @@ -244,26 +362,6 @@ func (et *endpointTranslator) diffEndpoints(filtered watcher.AddressSet) (watche
}
}

func (et *endpointTranslator) NoEndpoints(exists bool) {
et.log.Debugf("NoEndpoints(%+v)", exists)

et.availableEndpoints.Addresses = map[watcher.ID]watcher.Address{}
et.filteredSnapshot.Addresses = map[watcher.ID]watcher.Address{}

u := &pb.Update{
Update: &pb.Update_NoEndpoints{
NoEndpoints: &pb.NoEndpoints{
Exists: exists,
},
},
}

et.log.Debugf("Sending destination no endpoints: %+v", u)
if err := et.stream.Send(u); err != nil {
et.log.Debugf("Failed to send address update: %s", err)
}
}

func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
addrs := []*pb.WeightedAddr{}
for _, address := range set.Addresses {
Expand Down
Loading
Loading