Skip to content

Commit

Permalink
Add update queue to endpoint translator (#11491)
Browse files Browse the repository at this point in the history
When a grpc client of the destination.Get API initiates a request but then doesn't read off of that stream, the HTTP2 stream flow control window will fill up and eventually exert backpressure on the destination controller.  This manifests as calls to `Send` on the stream blocking.  Since `Send` is called synchronously from the client-go informer callback (by way of the endpoint translator), this blocks the informer callback and prevents all further informer calllbacks from firing.  This causes the destination controller to stop sending updates to any of its clients.

We add a queue in the endpoint translator so that when it gets an update from the informer callback, that update is queued and we avoid potentially blocking the informer callback.  Each endpoint translator spawns a goroutine to process this queue and call `Send`.  If there is not capacity in this queue (e.g. because a client has stopped reading and we are experiencing backpressure) then we terminate the stream.

Signed-off-by: Alex Leong <[email protected]>
  • Loading branch information
adleong authored and mateiidavid committed Oct 26, 2023
1 parent 59418e4 commit 12e7b86
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 187 deletions.
4 changes: 3 additions & 1 deletion controller/api/destination/destination_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func FuzzAdd(data []byte) int {
}
t := &testing.T{}
_, translator := makeEndpointTranslator(t)
translator.Start()
defer translator.Stop()
translator.Add(set)
translator.Remove(set)
return 1
Expand Down Expand Up @@ -52,7 +54,7 @@ func FuzzGet(data []byte) int {
server := makeServer(t)

stream := &bufferingGetStream{
updates: []*pb.Update{},
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
}
_ = server.Get(dest1, stream)
Expand Down
186 changes: 142 additions & 44 deletions controller/api/destination/endpoint_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"reflect"
"strconv"
"strings"
"sync"

pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2-proxy-api/go/net"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/addr"
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
)
Expand All @@ -22,26 +23,55 @@ const (
// inboundListenAddr is the environment variable holding the inbound
// listening address for the proxy container.
envInboundListenAddr = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR"

updateQueueCapacity = 100
)

// endpointTranslator satisfies EndpointUpdateListener and translates updates
// into Destination.Get messages.
type endpointTranslator struct {
controllerNS string
identityTrustDomain string
enableH2Upgrade bool
nodeTopologyZone string
nodeName string
defaultOpaquePorts map[uint32]struct{}
enableEndpointFiltering bool

availableEndpoints watcher.AddressSet
filteredSnapshot watcher.AddressSet
stream pb.Destination_GetServer
log *logging.Entry

mu sync.Mutex
}
type (
endpointTranslator struct {
controllerNS string
identityTrustDomain string
enableH2Upgrade bool
nodeTopologyZone string
nodeName string
defaultOpaquePorts map[uint32]struct{}
enableEndpointFiltering bool

availableEndpoints watcher.AddressSet
filteredSnapshot watcher.AddressSet
stream pb.Destination_GetServer
endStream chan struct{}
log *logging.Entry
overflowCounter prometheus.Counter

updates chan interface{}
stop chan struct{}
}

addUpdate struct {
set watcher.AddressSet
}

removeUpdate struct {
set watcher.AddressSet
}

noEndpointsUpdate struct {
exists bool
}
)

var updatesQueueOverflowCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "endpoint_updates_queue_overflow",
Help: "A counter incremented whenever the endpoint updates queue overflows",
},
[]string{
"service",
},
)

func newEndpointTranslator(
controllerNS string,
Expand All @@ -53,6 +83,7 @@ func newEndpointTranslator(
enableEndpointFiltering bool,
k8sAPI *k8s.MetadataAPI,
stream pb.Destination_GetServer,
endStream chan struct{},
log *logging.Entry,
) *endpointTranslator {
log = log.WithFields(logging.Fields{
Expand All @@ -79,33 +110,120 @@ func newEndpointTranslator(
availableEndpoints,
filteredSnapshot,
stream,
endStream,
log,
sync.Mutex{},
updatesQueueOverflowCounter.With(prometheus.Labels{"service": service}),
make(chan interface{}, updateQueueCapacity),
make(chan struct{}),
}
}

func (et *endpointTranslator) Add(set watcher.AddressSet) {
et.mu.Lock()
defer et.mu.Unlock()
et.enqueueUpdate(&addUpdate{set})
}

func (et *endpointTranslator) Remove(set watcher.AddressSet) {
et.enqueueUpdate(&removeUpdate{set})
}

func (et *endpointTranslator) NoEndpoints(exists bool) {
et.enqueueUpdate(&noEndpointsUpdate{exists})
}

// Add, Remove, and NoEndpoints are called from a client-go informer callback
// and therefore must not block. For each of these, we enqueue an update in
// a channel so that it can be processed asyncronously. To ensure that enqueuing
// does not block, we first check to see if there is capacity in the buffered
// channel. If there is not, we drop the update and signal to the stream that
// it has fallen too far behind and should be closed.
func (et *endpointTranslator) enqueueUpdate(update interface{}) {
select {
case et.updates <- update:
// Update has been successfully enqueued.
default:
// We are unable to enqueue because the channel does not have capacity.
// The stream has fallen too far behind and should be closed.
et.overflowCounter.Inc()
select {
case <-et.endStream:
// The endStream channel has already been closed so no action is
// necessary.
default:
et.log.Error("endpoint update queue full; aborting stream")
close(et.endStream)
}
}
}

// Start initiates a goroutine which processes update events off of the
// endpointTranslator's internal queue and sends to the grpc stream as
// appropriate. The goroutine calls several non-thread-safe functions (including
// Send) and therefore, Start must not be called more than once.
func (et *endpointTranslator) Start() {
go func() {
for {
select {
case update := <-et.updates:
et.processUpdate(update)
case <-et.stop:
return
}
}
}()
}

// Stop terminates the goroutine started by Start.
func (et *endpointTranslator) Stop() {
close(et.stop)
}

func (et *endpointTranslator) processUpdate(update interface{}) {
switch update := update.(type) {
case *addUpdate:
et.add(update.set)
case *removeUpdate:
et.remove(update.set)
case *noEndpointsUpdate:
et.noEndpoints(update.exists)
}
}

func (et *endpointTranslator) add(set watcher.AddressSet) {
for id, address := range set.Addresses {
et.availableEndpoints.Addresses[id] = address
}

et.sendFilteredUpdate(set)
}

func (et *endpointTranslator) Remove(set watcher.AddressSet) {
et.mu.Lock()
defer et.mu.Unlock()

func (et *endpointTranslator) remove(set watcher.AddressSet) {
for id := range set.Addresses {
delete(et.availableEndpoints.Addresses, id)
}

et.sendFilteredUpdate(set)
}

func (et *endpointTranslator) noEndpoints(exists bool) {
et.log.Debugf("NoEndpoints(%+v)", exists)

et.availableEndpoints.Addresses = map[watcher.ID]watcher.Address{}
et.filteredSnapshot.Addresses = map[watcher.ID]watcher.Address{}

u := &pb.Update{
Update: &pb.Update_NoEndpoints{
NoEndpoints: &pb.NoEndpoints{
Exists: exists,
},
},
}

et.log.Debugf("Sending destination no endpoints: %+v", u)
if err := et.stream.Send(u); err != nil {
et.log.Debugf("Failed to send address update: %s", err)
}
}

func (et *endpointTranslator) sendFilteredUpdate(set watcher.AddressSet) {
et.availableEndpoints = watcher.AddressSet{
Addresses: et.availableEndpoints.Addresses,
Expand Down Expand Up @@ -244,26 +362,6 @@ func (et *endpointTranslator) diffEndpoints(filtered watcher.AddressSet) (watche
}
}

func (et *endpointTranslator) NoEndpoints(exists bool) {
et.log.Debugf("NoEndpoints(%+v)", exists)

et.availableEndpoints.Addresses = map[watcher.ID]watcher.Address{}
et.filteredSnapshot.Addresses = map[watcher.ID]watcher.Address{}

u := &pb.Update{
Update: &pb.Update_NoEndpoints{
NoEndpoints: &pb.NoEndpoints{
Exists: exists,
},
},
}

et.log.Debugf("Sending destination no endpoints: %+v", u)
if err := et.stream.Send(u); err != nil {
et.log.Debugf("Failed to send address update: %s", err)
}
}

func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
addrs := []*pb.WeightedAddr{}
for _, address := range set.Addresses {
Expand Down
Loading

0 comments on commit 12e7b86

Please sign in to comment.