Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kds): fix retry on NACK and add backoff #9736

Merged
merged 12 commits into from
Apr 8, 2024
5 changes: 5 additions & 0 deletions pkg/events/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type TriggerInsightsComputationEvent struct {
TenantID string
}

type TriggerKDSResyncEvent struct {
Type model.ResourceType
NodeID string
}

var ListenerStoppedErr = errors.New("listener closed")

type Listener interface {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kds/v2/reconcile/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type Reconciler interface {
// Reconcile reconciles state of node given changed resource types.
// Returns error and bool which is true if any resource was changed.
Reconcile(context.Context, *envoy_core.Node, map[model.ResourceType]struct{}, logr.Logger) (error, bool)
// ForceVersion marks that resource type for a node ID will obtain a new version even if nothing changes.
// Note that it does not change snapshot, for this to actually apply on Envoy, we need to call Reconcile.
// It's not called immediately to avoid parallel Reconcile calls for the same node.
ForceVersion(node *envoy_core.Node, resourceType model.ResourceType)
Clear(context.Context, *envoy_core.Node) error
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/kds/v2/reconcile/reconcile_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package reconcile_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
)

func TestReconcile(t *testing.T) {
test.RunSpecs(t, "Reconcile Suite")
}
114 changes: 55 additions & 59 deletions pkg/kds/v2/reconcile/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@ package reconcile

import (
"context"
"errors"
"sync"

envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"

config_core "github.com/kumahq/kuma/pkg/config/core"
"github.com/kumahq/kuma/pkg/core"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
cache_v2 "github.com/kumahq/kuma/pkg/kds/v2/cache"
util_kds_v2 "github.com/kumahq/kuma/pkg/kds/v2/util"
Expand All @@ -29,6 +26,7 @@ func NewReconciler(hasher envoy_cache.NodeHash, cache envoy_cache.SnapshotCache,
mode: mode,
statsCallbacks: statsCallbacks,
tenants: tenants,
forceVersions: map[string][]core_model.ResourceType{},
}
}

Expand All @@ -41,6 +39,16 @@ type reconciler struct {
tenants multitenant.Tenants

lock sync.Mutex

forceVersions map[string][]core_model.ResourceType
forceVersionsLock sync.RWMutex
}

func (r *reconciler) ForceVersion(node *envoy_core.Node, resourceType core_model.ResourceType) {
nodeID := r.hasher.ID(node)
r.forceVersionsLock.Lock()
r.forceVersions[nodeID] = append(r.forceVersions[nodeID], resourceType)
r.forceVersionsLock.Unlock()
}

func (r *reconciler) Clear(ctx context.Context, node *envoy_core.Node) error {
Expand Down Expand Up @@ -88,79 +96,67 @@ func (r *reconciler) Reconcile(ctx context.Context, node *envoy_core.Node, chang
if new == nil {
return errors.New("nil snapshot"), false
}
// call ConstructVersionMap, so we can override versions if needed and compute what changed
if old != nil {
// this should already be computed by SetSnapshot, but we call it just to make sure we have versions.
if err := old.ConstructVersionMap(); err != nil {
return errors.Wrap(err, "could not construct version map"), false
}
}
if err := new.ConstructVersionMap(); err != nil {
return errors.Wrap(err, "could not construct version map"), false
}
r.forceNewVersion(new, id)
slonka marked this conversation as resolved.
Show resolved Hide resolved

new, changed := r.Version(new, old)
if changed {
r.logChanges(logger, new, old, node)
r.meterConfigReadyForDelivery(new, old, node.Id)
if changed := r.changedTypes(old, new); len(changed) > 0 {
r.logChanges(logger, changed, node)
r.meterConfigReadyForDelivery(changed, node.Id)
return r.cache.SetSnapshot(ctx, id, new), true
}
return nil, false
}

func (r *reconciler) Version(new, old envoy_cache.ResourceSnapshot) (envoy_cache.ResourceSnapshot, bool) {
if new == nil {
return nil, false
}
changed := false
newResources := map[core_model.ResourceType]envoy_cache.Resources{}
func (r *reconciler) changedTypes(old, new envoy_cache.ResourceSnapshot) []core_model.ResourceType {
var changed []core_model.ResourceType
for _, typ := range util_kds_v2.GetSupportedTypes() {
version := new.GetVersion(typ)
if version != "" {
// favor a version assigned by resource generator
continue
}

if old != nil && r.equal(new.GetResources(typ), old.GetResources(typ)) {
version = old.GetVersion(typ)
}
if version == "" {
version = core.NewUUID()
changed = true
}
if new.GetVersion(typ) == version {
continue
if (old == nil && len(new.GetVersionMap(typ)) > 0) ||
(old != nil && !maps.Equal(old.GetVersionMap(typ), new.GetVersionMap(typ))) {
changed = append(changed, core_model.ResourceType(typ))
}
n := map[string]envoy_types.ResourceWithTTL{}
for k, v := range new.GetResourcesAndTTL(typ) {
n[k] = v
}
newResources[core_model.ResourceType(typ)] = envoy_cache.Resources{Version: version, Items: n}
}
return &cache_v2.Snapshot{
Resources: newResources,
}, changed
return changed
}

func (_ *reconciler) equal(new, old map[string]envoy_types.Resource) bool {
if len(new) != len(old) {
return false
}
for key, newValue := range new {
if oldValue, hasOldValue := old[key]; !hasOldValue || !proto.Equal(newValue, oldValue) {
return false
// see kdsRetryForcer for more information
func (r *reconciler) forceNewVersion(snapshot envoy_cache.ResourceSnapshot, id string) {
r.forceVersionsLock.Lock()
forceVersionsForTypes := r.forceVersions[id]
delete(r.forceVersions, id)
r.forceVersionsLock.Unlock()
for _, typ := range forceVersionsForTypes {
cacheSnapshot, ok := snapshot.(*cache_v2.Snapshot)
if !ok {
panic("invalid type of Snapshot")
}
for resourceName := range cacheSnapshot.VersionMap[typ] {
cacheSnapshot.VersionMap[typ][resourceName] = ""
}
}
return true
}

func (r *reconciler) logChanges(logger logr.Logger, new envoy_cache.ResourceSnapshot, old envoy_cache.ResourceSnapshot, node *envoy_core.Node) {
for _, typ := range util_kds_v2.GetSupportedTypes() {
if old != nil && old.GetVersion(typ) != new.GetVersion(typ) {
client := node.Id
if r.mode == config_core.Zone {
// we need to override client name because Zone is always a client to Global (on gRPC level)
client = "global"
}
logger.Info("detected changes in the resources. Sending changes to the client.", "resourceType", typ, "client", client) // todo is client needed?
func (r *reconciler) logChanges(logger logr.Logger, changedTypes []core_model.ResourceType, node *envoy_core.Node) {
for _, typ := range changedTypes {
client := node.Id
if r.mode == config_core.Zone {
// we need to override client name because Zone is always a client to Global (on gRPC level)
client = "global"
}
logger.Info("detected changes in the resources. Sending changes to the client.", "resourceType", typ, "client", client) // todo is client needed?
}
}

func (r *reconciler) meterConfigReadyForDelivery(new envoy_cache.ResourceSnapshot, old envoy_cache.ResourceSnapshot, nodeID string) {
for _, typ := range util_kds_v2.GetSupportedTypes() {
if old == nil || old.GetVersion(typ) != new.GetVersion(typ) {
r.statsCallbacks.ConfigReadyForDelivery(nodeID + typ)
}
func (r *reconciler) meterConfigReadyForDelivery(changedTypes []core_model.ResourceType, nodeID string) {
for _, typ := range changedTypes {
r.statsCallbacks.ConfigReadyForDelivery(nodeID + string(typ))
}
}
96 changes: 96 additions & 0 deletions pkg/kds/v2/reconcile/reconciler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package reconcile_test

import (
"context"

envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

config_core "github.com/kumahq/kuma/pkg/config/core"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
core_manager "github.com/kumahq/kuma/pkg/core/resources/manager"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/kds/v2/reconcile"
"github.com/kumahq/kuma/pkg/kds/v2/server"
core_metrics "github.com/kumahq/kuma/pkg/metrics"
"github.com/kumahq/kuma/pkg/multitenant"
"github.com/kumahq/kuma/pkg/plugins/resources/memory"
"github.com/kumahq/kuma/pkg/test/resources/samples"
util_xds "github.com/kumahq/kuma/pkg/util/xds"
)

var _ = Describe("Reconciler", func() {
var reconciler reconcile.Reconciler
var store core_store.ResourceStore
var snapshotCache envoy_cache.SnapshotCache

node := &envoy_core.Node{
Id: "a",
}
changedTypes := map[core_model.ResourceType]struct{}{
core_mesh.MeshType: {},
}

BeforeEach(func() {
store = memory.NewStore()
generator := reconcile.NewSnapshotGenerator(core_manager.NewResourceManager(store), reconcile.Any, reconcile.NoopResourceMapper)
hasher := &server.Hasher{}
snapshotCache = envoy_cache.NewSnapshotCache(false, hasher, util_xds.NewLogger(logr.Discard()))
metrics, err := core_metrics.NewMetrics("zone-1")
Expect(err).ToNot(HaveOccurred())
statsCallbacks, err := util_xds.NewStatsCallbacks(metrics, "kds_delta", util_xds.NoopVersionExtractor)
Expect(err).ToNot(HaveOccurred())
reconciler = reconcile.NewReconciler(hasher, snapshotCache, generator, config_core.Zone, statsCallbacks, multitenant.SingleTenant)
})

It("should reconcile snapshot in snapshot cache", func() {
// given
Expect(store.Create(context.Background(), samples.MeshDefault(), core_store.CreateByKey(core_model.DefaultMesh, core_model.NoMesh))).To(Succeed())

// when
err, changed := reconciler.Reconcile(context.Background(), node, changedTypes, logr.Discard())

// then
Expect(err).ToNot(HaveOccurred())
Expect(changed).To(BeTrue())
snapshot, err := snapshotCache.GetSnapshot(node.Id)
Expect(err).ToNot(HaveOccurred())
Expect(snapshot.GetResources(string(core_mesh.MeshType))).To(HaveLen(1))

// when reconciled again without resource changes
err, changed = reconciler.Reconcile(context.Background(), node, changedTypes, logr.Discard())

// then
Expect(err).ToNot(HaveOccurred())
Expect(changed).To(BeFalse())
newSnapshot, err := snapshotCache.GetSnapshot(node.Id)
Expect(err).ToNot(HaveOccurred())
Expect(snapshot).To(BeIdenticalTo(newSnapshot))
})

It("should force new version in snapshot cache", func() {
// given
Expect(store.Create(context.Background(), samples.MeshDefault(), core_store.CreateByKey(core_model.DefaultMesh, core_model.NoMesh))).To(Succeed())

err, _ := reconciler.Reconcile(context.Background(), node, changedTypes, logr.Discard())
Expect(err).ToNot(HaveOccurred())
snapshot, err := snapshotCache.GetSnapshot(node.Id)
Expect(err).ToNot(HaveOccurred())

// when
reconciler.ForceVersion(node, core_mesh.MeshType)
err, changed := reconciler.Reconcile(context.Background(), node, changedTypes, logr.Discard())

// then
Expect(err).ToNot(HaveOccurred())
Expect(changed).To(BeTrue())
newSnapshot, err := snapshotCache.GetSnapshot(node.Id)
Expect(err).ToNot(HaveOccurred())
Expect(snapshot.GetResources(string(core_mesh.MeshType))).To(Equal(newSnapshot.GetResources(string(core_mesh.MeshType))))
Expect(snapshot.GetVersionMap(string(core_mesh.MeshType))).ToNot(Equal(newSnapshot.GetVersionMap(string(core_mesh.MeshType))))
})
})
8 changes: 4 additions & 4 deletions pkg/kds/v2/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func New(
util_xds_v3.AdaptDeltaCallbacks(util_xds.LoggingCallbacks{Log: log}),
util_xds_v3.AdaptDeltaCallbacks(statsCallbacks),
// util_xds_v3.AdaptDeltaCallbacks(NewNackBackoff(nackBackoff)),
newKdsRetryForcer(log, cache, hasher),
newKdsRetryForcer(log, reconciler.ForceVersion, nackBackoff, rt.EventBus()),
syncTracker,
status.DefaultStatusTracker(rt, log),
}
Expand Down Expand Up @@ -170,14 +170,14 @@ func kdsVersionExtractor(metadata *structpb.Struct) string {
}

func newKDSContext(log logr.Logger) (envoy_cache.NodeHash, envoy_cache.SnapshotCache) { //nolint:unparam
hasher := hasher{}
hasher := Hasher{}
logger := util_xds.NewLogger(log)
return hasher, envoy_cache.NewSnapshotCache(false, hasher, logger)
}

type hasher struct{}
type Hasher struct{}

func (_ hasher) ID(node *envoy_core.Node) string {
func (_ Hasher) ID(node *envoy_core.Node) string {
tenantID, found := util.TenantFromMetadata(node)
if !found {
return node.Id
Expand Down
31 changes: 17 additions & 14 deletions pkg/kds/v2/server/event_based_watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,14 @@ var _ util_watchdog.Watchdog = &EventBasedWatchdog{}
func (e *EventBasedWatchdog) Start(stop <-chan struct{}) {
tenantID, _ := multitenant.TenantFromCtx(e.Ctx)
listener := e.EventBus.Subscribe(func(event events.Event) bool {
resChange, ok := event.(events.ResourceChangedEvent)
if !ok {
return false
switch ev := event.(type) {
case events.ResourceChangedEvent:
_, ok := e.ProvidedTypes[ev.Type]
return ok && ev.TenantID == tenantID
case events.TriggerKDSResyncEvent:
return ev.NodeID == e.Node.Id
}
if resChange.TenantID != tenantID {
return false
}
if _, ok := e.ProvidedTypes[resChange.Type]; !ok {
return false
}
return true
return false
})
flushTicker := e.NewFlushTicker()
defer flushTicker.Stop()
Expand Down Expand Up @@ -95,10 +92,16 @@ func (e *EventBasedWatchdog) Start(stop <-chan struct{}) {
changedTypes = maps.Clone(e.ProvidedTypes)
reasons[ReasonResync] = struct{}{}
case event := <-listener.Recv():
resChange := event.(events.ResourceChangedEvent)
e.Log.V(1).Info("schedule sync for type", "typ", resChange.Type)
changedTypes[resChange.Type] = struct{}{}
reasons[ReasonEvent] = struct{}{}
switch ev := event.(type) {
case events.ResourceChangedEvent:
e.Log.V(1).Info("schedule sync for type", "typ", ev.Type, "event", "ResourceChanged")
changedTypes[ev.Type] = struct{}{}
reasons[ReasonEvent] = struct{}{}
case events.TriggerKDSResyncEvent:
e.Log.V(1).Info("schedule sync for type", "typ", ev.Type, "event", "TriggerKDSResync")
changedTypes[ev.Type] = struct{}{}
reasons[ReasonEvent] = struct{}{}
}
}
}
}
Loading
Loading