From 500f418dca71cc88050a677b0071c91ec507c1f9 Mon Sep 17 00:00:00 2001 From: Fabian Kramm Date: Thu, 15 Aug 2024 10:56:19 +0200 Subject: [PATCH] fix: store watches & delete references --- .../resources/csistoragecapacities/mapper.go | 2 +- pkg/mappings/generic/mapper.go | 2 +- pkg/mappings/generic/recorder.go | 2 +- pkg/mappings/generic/recorder_test.go | 4 +- pkg/mappings/resources/configmaps.go | 2 +- pkg/mappings/resources/secrets.go | 4 +- pkg/mappings/store/mapping.go | 2 + pkg/mappings/store/store.go | 195 ++++++++++--- pkg/mappings/store/store_test.go | 268 +++++++++++++++++- pkg/syncer/synccontext/mapper.go | 17 +- 10 files changed, 439 insertions(+), 59 deletions(-) diff --git a/pkg/controllers/resources/csistoragecapacities/mapper.go b/pkg/controllers/resources/csistoragecapacities/mapper.go index c22d3232c0..72577f42bb 100644 --- a/pkg/controllers/resources/csistoragecapacities/mapper.go +++ b/pkg/controllers/resources/csistoragecapacities/mapper.go @@ -42,7 +42,7 @@ func (s *csiStorageCapacitiesMapper) Migrate(ctx *synccontext.RegisterContext, m HostName: pName, } - err = ctx.Mappings.Store().RecordAndSaveReference(ctx, nameMapping, nameMapping) + err = ctx.Mappings.Store().AddReferenceAndSave(ctx, nameMapping, nameMapping) if err != nil { return fmt.Errorf("error saving reference in store: %w", err) } diff --git a/pkg/mappings/generic/mapper.go b/pkg/mappings/generic/mapper.go index c72449baa5..b6310d93f3 100644 --- a/pkg/mappings/generic/mapper.go +++ b/pkg/mappings/generic/mapper.go @@ -121,7 +121,7 @@ func (n *mapper) Migrate(ctx *synccontext.RegisterContext, mapper synccontext.Ma HostName: pName, } - err = ctx.Mappings.Store().RecordAndSaveReference(ctx, nameMapping, nameMapping) + err = ctx.Mappings.Store().AddReferenceAndSave(ctx, nameMapping, nameMapping) if err != nil { return fmt.Errorf("error saving reference in store: %w", err) } diff --git a/pkg/mappings/generic/recorder.go b/pkg/mappings/generic/recorder.go index c95598aeb0..727e3c6d6e 100644 --- a/pkg/mappings/generic/recorder.go +++ b/pkg/mappings/generic/recorder.go @@ -84,7 +84,7 @@ func RecordMapping(ctx *synccontext.SyncContext, pName, vName types.NamespacedNa } // record the reference - err := ctx.Mappings.Store().RecordReference(ctx, synccontext.NameMapping{ + err := ctx.Mappings.Store().AddReference(ctx, synccontext.NameMapping{ GroupVersionKind: gvk, HostName: pName, diff --git a/pkg/mappings/generic/recorder_test.go b/pkg/mappings/generic/recorder_test.go index c4e220caeb..f5de405a89 100644 --- a/pkg/mappings/generic/recorder_test.go +++ b/pkg/mappings/generic/recorder_test.go @@ -55,7 +55,7 @@ func TestRecorder(t *testing.T) { VirtualName: vTest, HostName: pTestOther, } - err = mappingsStore.RecordReference(syncContext.Context, conflictingMapping, conflictingMapping) + err = mappingsStore.AddReference(syncContext.Context, conflictingMapping, conflictingMapping) assert.NilError(t, err) // check that mapping is empty @@ -86,7 +86,7 @@ func TestRecorder(t *testing.T) { HostName: retTest, VirtualName: pTestOther, } - err = mappingsStore.RecordReference(syncContext.Context, conflictingMapping, conflictingMapping) + err = mappingsStore.AddReference(syncContext.Context, conflictingMapping, conflictingMapping) assert.ErrorContains(t, err, "there is already another name mapping") // check if managed 1 diff --git a/pkg/mappings/resources/configmaps.go b/pkg/mappings/resources/configmaps.go index b7c32b12ea..a8ac0670e5 100644 --- a/pkg/mappings/resources/configmaps.go +++ b/pkg/mappings/resources/configmaps.go @@ -86,7 +86,7 @@ func (s *configMapsMapper) Migrate(ctx *synccontext.RegisterContext, mapper sync for _, configMap := range configMapsFromPod(item) { pName := mapper.VirtualToHost(ctx.ToSyncContext("migrate-pod"), configMap, nil) if pName.Name != "" { - err = ctx.Mappings.Store().RecordAndSaveReference(ctx, synccontext.NameMapping{ + err = ctx.Mappings.Store().AddReferenceAndSave(ctx, synccontext.NameMapping{ GroupVersionKind: mappings.ConfigMaps(), VirtualName: configMap, HostName: pName, diff --git a/pkg/mappings/resources/secrets.go b/pkg/mappings/resources/secrets.go index e205d2f59d..0148fb7ac8 100644 --- a/pkg/mappings/resources/secrets.go +++ b/pkg/mappings/resources/secrets.go @@ -58,7 +58,7 @@ func (s *secretsMapper) Migrate(ctx *synccontext.RegisterContext, mapper synccon for _, secret := range secretNamesFromPod(syncContext, item) { pName := mapper.VirtualToHost(syncContext, secret, nil) if pName.Name != "" { - err = ctx.Mappings.Store().RecordAndSaveReference(ctx, synccontext.NameMapping{ + err = ctx.Mappings.Store().AddReferenceAndSave(ctx, synccontext.NameMapping{ GroupVersionKind: mappings.Secrets(), VirtualName: secret, HostName: pName, @@ -90,7 +90,7 @@ func (s *secretsMapper) Migrate(ctx *synccontext.RegisterContext, mapper synccon for _, secret := range secretNamesFromIngress(syncContext, item) { pName := mapper.VirtualToHost(syncContext, secret, nil) if pName.Name != "" { - err = ctx.Mappings.Store().RecordAndSaveReference(ctx, synccontext.NameMapping{ + err = ctx.Mappings.Store().AddReferenceAndSave(ctx, synccontext.NameMapping{ GroupVersionKind: mappings.Secrets(), VirtualName: secret, HostName: pName, diff --git a/pkg/mappings/store/mapping.go b/pkg/mappings/store/mapping.go index dee7574a8c..e7471d0b3a 100644 --- a/pkg/mappings/store/mapping.go +++ b/pkg/mappings/store/mapping.go @@ -5,6 +5,8 @@ import "github.com/loft-sh/vcluster/pkg/syncer/synccontext" type Mapping struct { synccontext.NameMapping `json:",inline"` + Sender string `json:"sender,omitempty"` + References []synccontext.NameMapping `json:"references,omitempty"` changed bool `json:"-"` diff --git a/pkg/mappings/store/store.go b/pkg/mappings/store/store.go index ff696240a5..beea417e83 100644 --- a/pkg/mappings/store/store.go +++ b/pkg/mappings/store/store.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/loft-sh/vcluster/pkg/scheme" "github.com/loft-sh/vcluster/pkg/syncer/synccontext" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -25,6 +26,8 @@ func NewStore(ctx context.Context, cachedVirtualClient, cachedHostClient client. store := &Store{ backend: backend, + sender: uuid.NewString(), + cachedVirtualClient: cachedVirtualClient, cachedHostClient: cachedHostClient, @@ -48,6 +51,8 @@ func NewStore(ctx context.Context, cachedVirtualClient, cachedHostClient client. type Store struct { m sync.RWMutex + sender string + cachedVirtualClient client.Client cachedHostClient client.Client @@ -106,60 +111,75 @@ func (s *Store) garbageCollectMappings(ctx context.Context) { } func (s *Store) garbageCollectMapping(ctx context.Context, mapping *Mapping) error { - // build the object we can query - obj, err := scheme.Scheme.New(mapping.GroupVersionKind) + // check if object exists + exists, err := s.objectExists(ctx, mapping.NameMapping) if err != nil { - if !runtime.IsNotRegisteredError(err) { - return fmt.Errorf("create object: %w", err) - } - - obj = &unstructured.Unstructured{} + return err + } else if exists { + return nil } - uList, ok := obj.(*unstructured.Unstructured) - if ok { - uList.SetKind(mapping.GroupVersionKind.Kind) - uList.SetAPIVersion(mapping.GroupVersionKind.GroupVersion().String()) + // delete the mapping + err = s.deleteMapping(ctx, mapping) + if err != nil { + return err } - // check if virtual object exists - virtualExists := true - err = s.cachedVirtualClient.Get(ctx, types.NamespacedName{Name: mapping.VirtualName.Name, Namespace: mapping.VirtualName.Namespace}, obj.DeepCopyObject().(client.Object)) - if err != nil { - if !kerrors.IsNotFound(err) { - // TODO: filter out other allowed errors here could be Forbidden, Type not found etc. - klog.FromContext(ctx).Info("Error retrieving virtual object", "virtualObject", mapping.Virtual().String()) - } + klog.FromContext(ctx).Info("Remove mapping as both virtual and host were not found", "mapping", mapping.String()) + return nil +} + +func (s *Store) deleteMapping(ctx context.Context, mapping *Mapping) error { + // set sender + mapping.Sender = s.sender - virtualExists = false + // remove mapping from backend + err := s.backend.Delete(ctx, mapping) + if err != nil { + return fmt.Errorf("remove mapping from backend: %w", err) } - // check if host object exists - hostExists := true - err = s.cachedVirtualClient.Get(ctx, types.NamespacedName{Name: mapping.HostName.Name, Namespace: mapping.HostName.Namespace}, obj.DeepCopyObject().(client.Object)) + s.removeMapping(mapping) + return nil +} + +func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMapping) (bool, error) { + // build the object we can query + obj, err := scheme.Scheme.New(nameMapping.GroupVersionKind) if err != nil { - if !kerrors.IsNotFound(err) { - // TODO: filter out other allowed errors here could be Forbidden, Type not found etc. - klog.FromContext(ctx).Info("Error retrieving host object", "hostObject", mapping.Host().String()) + if !runtime.IsNotRegisteredError(err) { + return false, fmt.Errorf("create object: %w", err) } - hostExists = false + obj = &unstructured.Unstructured{} } - // remove mapping if both objects are not found anymore - if virtualExists || hostExists { - return nil + // set kind & apiVersion if unstructured + uObject, ok := obj.(*unstructured.Unstructured) + if ok { + uObject.SetKind(nameMapping.GroupVersionKind.Kind) + uObject.SetAPIVersion(nameMapping.GroupVersionKind.GroupVersion().String()) } - // remove mapping from backend - err = s.backend.Delete(ctx, mapping) - if err != nil { - return fmt.Errorf("remove mapping from backend: %w", err) + // check if virtual object exists + err = s.cachedVirtualClient.Get(ctx, nameMapping.VirtualName, obj.DeepCopyObject().(client.Object)) + if err == nil { + return true, nil + } else if !kerrors.IsNotFound(err) { + // TODO: filter out other allowed errors here could be Forbidden, Type not found etc. + klog.FromContext(ctx).Info("Error retrieving virtual object", "virtualObject", nameMapping.Virtual().String()) } - klog.FromContext(ctx).Info("Remove mapping as both virtual and host were not found", "mapping", mapping.String()) - s.removeMapping(mapping) - return nil + // check if host object exists + err = s.cachedHostClient.Get(ctx, nameMapping.HostName, obj.DeepCopyObject().(client.Object)) + if err == nil { + return true, nil + } else if !kerrors.IsNotFound(err) { + // TODO: filter out other allowed errors here could be Forbidden, Type not found etc. + klog.FromContext(ctx).Info("Error retrieving host object", "hostObject", nameMapping.Host().String()) + } + + return false, nil } func (s *Store) start(ctx context.Context) error { @@ -204,6 +224,11 @@ func (s *Store) handleEvent(ctx context.Context, watchEvent BackendWatchResponse } for _, event := range watchEvent.Events { + // ignore events sent by us + if event.Mapping.Sender == s.sender { + continue + } + klog.FromContext(ctx).V(1).Info("mapping store received event", "type", event.Type, "mapping", event.Mapping.String()) // remove mapping in any case @@ -245,8 +270,58 @@ func (s *Store) VirtualToHostName(_ context.Context, vObj synccontext.Object) (t return pObjLookup.Object.NamespacedName, ok } -func (s *Store) RecordAndSaveReference(ctx context.Context, nameMapping, belongsTo synccontext.NameMapping) error { - err := s.RecordReference(ctx, nameMapping, belongsTo) +func (s *Store) DeleteReferenceAndSave(ctx context.Context, nameMapping, belongsTo synccontext.NameMapping) error { + err := s.DeleteReference(ctx, nameMapping, belongsTo) + if err != nil { + return err + } + + return s.SaveMapping(ctx, belongsTo) +} + +func (s *Store) DeleteReference(ctx context.Context, nameMapping, belongsTo synccontext.NameMapping) error { + // we don't record incomplete mappings + if nameMapping.Host().Empty() || nameMapping.Virtual().Empty() { + return nil + } + + s.m.Lock() + defer s.m.Unlock() + + // check if there is already a mapping + mapping, ok := s.findMapping(belongsTo) + if !ok { + return nil + } + + // check if reference already exists + newReferences := make([]synccontext.NameMapping, 0, len(mapping.References)-1) + for _, reference := range mapping.References { + if reference.Equals(nameMapping) { + continue + } + + newReferences = append(newReferences, reference) + } + + // check if we found the reference + if len(newReferences) == len(mapping.References) { + return nil + } + + // signal mapping was changed + mapping.References = newReferences + mapping.changed = true + klog.FromContext(ctx).Info("Delete mapping reference", "host", nameMapping.Host().String(), "virtual", nameMapping.Virtual().String(), "owner", mapping.Virtual().String()) + + // add to lookup maps + s.removeNameFromMaps(mapping, nameMapping.Virtual(), nameMapping.Host()) + dispatchAll(s.watches[nameMapping.GroupVersionKind], nameMapping) + return nil +} + +func (s *Store) AddReferenceAndSave(ctx context.Context, nameMapping, belongsTo synccontext.NameMapping) error { + err := s.AddReference(ctx, nameMapping, belongsTo) if err != nil { return err } @@ -254,7 +329,7 @@ func (s *Store) RecordAndSaveReference(ctx context.Context, nameMapping, belongs return s.SaveMapping(ctx, belongsTo) } -func (s *Store) RecordReference(ctx context.Context, nameMapping, belongsTo synccontext.NameMapping) error { +func (s *Store) AddReference(ctx context.Context, nameMapping, belongsTo synccontext.NameMapping) error { // we don't record incomplete mappings if nameMapping.Host().Empty() || nameMapping.Virtual().Empty() { return nil @@ -290,7 +365,7 @@ func (s *Store) RecordReference(ctx context.Context, nameMapping, belongsTo sync // add mapping mapping.changed = true - klog.FromContext(ctx).Info("Add name mapping", "host", nameMapping.Host().String(), "virtual", nameMapping.Virtual().String(), "owner", mapping.Virtual().String()) + klog.FromContext(ctx).Info("Add mapping reference", "host", nameMapping.Host().String(), "virtual", nameMapping.Virtual().String(), "owner", mapping.Virtual().String()) mapping.References = append(mapping.References, nameMapping) // add to lookup maps @@ -316,6 +391,9 @@ func (s *Store) SaveMapping(ctx context.Context, nameMapping synccontext.NameMap return nil } + // set sender + mapping.Sender = s.sender + // save mapping klog.FromContext(ctx).Info("Save object mappings in store", "mapping", mapping.String()) err := s.backend.Save(ctx, mapping) @@ -327,15 +405,45 @@ func (s *Store) SaveMapping(ctx context.Context, nameMapping synccontext.NameMap return nil } -func (s *Store) ReferencesTo(ctx context.Context, vObj synccontext.Object) []synccontext.NameMapping { +func (s *Store) DeleteMapping(ctx context.Context, nameMapping synccontext.NameMapping) error { // we ignore empty mappings here - if vObj.Empty() { + if nameMapping.Empty() { + return nil + } + + s.m.Lock() + defer s.m.Unlock() + + // check if there is already a mapping + mapping, ok := s.findMapping(nameMapping) + if !ok { return nil } + // delete the mapping + err := s.deleteMapping(ctx, mapping) + if err != nil { + return err + } + + klog.FromContext(ctx).Info("Remove object mappings in store", "mapping", mapping.String()) + return nil +} + +func (s *Store) ReferencesTo(ctx context.Context, vObj synccontext.Object) []synccontext.NameMapping { s.m.Lock() defer s.m.Unlock() + retReferences := s.referencesTo(vObj) + klog.FromContext(ctx).V(1).Info("Found references for object", "object", vObj.String(), "references", len(retReferences)) + return retReferences +} + +func (s *Store) referencesTo(vObj synccontext.Object) []synccontext.NameMapping { + if vObj.Empty() { + return nil + } + hostNameLookup, ok := s.virtualToHostName[vObj] if !ok { return nil @@ -356,7 +464,6 @@ func (s *Store) ReferencesTo(ctx context.Context, vObj synccontext.Object) []syn retReferences = append(retReferences, reference.NameMapping) } - klog.FromContext(ctx).V(1).Info("Found references for object", "object", vObj.String(), "references", len(retReferences)) return retReferences } diff --git a/pkg/mappings/store/store_test.go b/pkg/mappings/store/store_test.go index b6d1279533..91f11af17f 100644 --- a/pkg/mappings/store/store_test.go +++ b/pkg/mappings/store/store_test.go @@ -3,15 +3,263 @@ package store import ( "context" "testing" + "time" "github.com/loft-sh/vcluster/pkg/scheme" "github.com/loft-sh/vcluster/pkg/syncer/synccontext" + "github.com/loft-sh/vcluster/pkg/util/random" testingutil "github.com/loft-sh/vcluster/pkg/util/testing" "gotest.tools/v3/assert" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" ) +func TestDeleteReference(t *testing.T) { + ctx := context.TODO() + vClient := testingutil.NewFakeClient(scheme.Scheme) + pClient := testingutil.NewFakeClient(scheme.Scheme) + backend := NewMemoryBackend() + genericStore, err := NewStore(ctx, vClient, pClient, backend) + assert.NilError(t, err) + + store, ok := genericStore.(*Store) + assert.Equal(t, true, ok) + + secretMapping := NewRandomMapping(corev1.SchemeGroupVersion.WithKind("Secret")) + otherSecretMapping := NewRandomMapping(corev1.SchemeGroupVersion.WithKind("Secret")) + podMapping := NewRandomMapping(corev1.SchemeGroupVersion.WithKind("Pod")) + otherPodMapping := NewRandomMapping(corev1.SchemeGroupVersion.WithKind("Pod")) + + err = store.AddReference(ctx, podMapping, podMapping) + assert.NilError(t, err) + err = store.AddReference(ctx, otherPodMapping, otherPodMapping) + assert.NilError(t, err) + err = store.AddReference(ctx, secretMapping, secretMapping) + assert.NilError(t, err) + err = store.AddReference(ctx, secretMapping, podMapping) + assert.NilError(t, err) + err = store.AddReference(ctx, secretMapping, otherPodMapping) + assert.NilError(t, err) + err = store.AddReference(ctx, otherSecretMapping, podMapping) + assert.NilError(t, err) + assert.Equal(t, 3, len(store.mappings)) + assert.Equal(t, 4, len(store.hostToVirtualName)) + assert.Equal(t, 4, len(store.virtualToHostName)) + assert.Equal(t, 2, len(store.ReferencesTo(ctx, secretMapping.Virtual()))) + assert.Equal(t, 1, len(store.ReferencesTo(ctx, otherSecretMapping.Virtual()))) + + err = store.DeleteReference(ctx, otherSecretMapping, podMapping) + assert.NilError(t, err) + assert.Equal(t, 1, len(store.mappings[podMapping].References)) + assert.Equal(t, 3, len(store.mappings)) + assert.Equal(t, 3, len(store.hostToVirtualName)) + assert.Equal(t, 3, len(store.virtualToHostName)) + assert.Equal(t, 2, len(store.ReferencesTo(ctx, secretMapping.Virtual()))) + assert.Equal(t, 0, len(store.ReferencesTo(ctx, otherSecretMapping.Virtual()))) + + err = store.DeleteMapping(ctx, podMapping) + assert.NilError(t, err) + assert.Equal(t, 2, len(store.mappings)) + assert.Equal(t, 2, len(store.hostToVirtualName)) + assert.Equal(t, 2, len(store.virtualToHostName)) + assert.Equal(t, 1, len(store.ReferencesTo(ctx, secretMapping.Virtual()))) + assert.Equal(t, 0, len(store.ReferencesTo(ctx, otherSecretMapping.Virtual()))) + + err = store.DeleteReference(ctx, secretMapping, otherPodMapping) + assert.NilError(t, err) + assert.Equal(t, 2, len(store.mappings)) + assert.Equal(t, 2, len(store.hostToVirtualName)) + assert.Equal(t, 2, len(store.virtualToHostName)) + assert.Equal(t, 0, len(store.ReferencesTo(ctx, secretMapping.Virtual()))) + assert.Equal(t, 0, len(store.ReferencesTo(ctx, otherSecretMapping.Virtual()))) + + err = store.DeleteMapping(ctx, secretMapping) + assert.NilError(t, err) + assert.Equal(t, 1, len(store.mappings)) + assert.Equal(t, 1, len(store.hostToVirtualName)) + assert.Equal(t, 1, len(store.virtualToHostName)) + assert.Equal(t, 0, len(store.ReferencesTo(ctx, secretMapping.Virtual()))) + assert.Equal(t, 0, len(store.ReferencesTo(ctx, otherSecretMapping.Virtual()))) +} + +func TestWatching(t *testing.T) { + ctx := context.TODO() + vClient := testingutil.NewFakeClient(scheme.Scheme) + pClient := testingutil.NewFakeClient(scheme.Scheme) + backend := NewMemoryBackend() + genericStore, err := NewStore(ctx, vClient, pClient, backend) + assert.NilError(t, err) + + store, ok := genericStore.(*Store) + assert.Equal(t, true, ok) + + secretMapping := NewRandomMapping(corev1.SchemeGroupVersion.WithKind("Secret")) + otherSecretMapping := NewRandomMapping(corev1.SchemeGroupVersion.WithKind("Secret")) + podMapping := NewRandomMapping(corev1.SchemeGroupVersion.WithKind("Pod")) + + // check save + err = backend.Save(ctx, &Mapping{ + NameMapping: secretMapping, + Sender: "doesnotexist", + References: []synccontext.NameMapping{ + podMapping, + }, + }) + assert.NilError(t, err) + + // wait for event to arrive + err = wait.PollUntilContextTimeout(ctx, time.Millisecond*10, time.Second, true, func(_ context.Context) (bool, error) { + store.m.Lock() + defer store.m.Unlock() + return len(store.mappings) == 1 && len(store.hostToVirtualName) == 2 && len(store.virtualToHostName) == 2 && len(store.referencesTo(podMapping.Virtual())) == 1, nil + }) + assert.NilError(t, err) + + // check save + err = backend.Save(ctx, &Mapping{ + NameMapping: otherSecretMapping, + Sender: "doesnotexist", + References: []synccontext.NameMapping{ + podMapping, + }, + }) + assert.NilError(t, err) + + // wait for event to arrive + err = wait.PollUntilContextTimeout(ctx, time.Millisecond*10, time.Second, true, func(_ context.Context) (bool, error) { + store.m.Lock() + defer store.m.Unlock() + return len(store.mappings) == 2 && len(store.hostToVirtualName) == 3 && len(store.virtualToHostName) == 3 && len(store.referencesTo(podMapping.Virtual())) == 2, nil + }) + assert.NilError(t, err) + + // check update + err = backend.Save(ctx, &Mapping{ + NameMapping: secretMapping, + Sender: "doesnotexist", + }) + assert.NilError(t, err) + + // wait for event to arrive + err = wait.PollUntilContextTimeout(ctx, time.Millisecond*10, time.Second, true, func(_ context.Context) (bool, error) { + store.m.Lock() + defer store.m.Unlock() + return len(store.mappings) == 2 && len(store.hostToVirtualName) == 3 && len(store.virtualToHostName) == 3 && len(store.referencesTo(podMapping.Virtual())) == 1, nil + }) + assert.NilError(t, err) + + // check delete + err = backend.Delete(ctx, &Mapping{ + NameMapping: secretMapping, + Sender: "doesnotexist", + }) + assert.NilError(t, err) + + // wait for event to arrive + err = wait.PollUntilContextTimeout(ctx, time.Millisecond*10, time.Second, true, func(_ context.Context) (bool, error) { + store.m.Lock() + defer store.m.Unlock() + return len(store.mappings) == 1 && len(store.hostToVirtualName) == 2 && len(store.virtualToHostName) == 2 && len(store.referencesTo(podMapping.Virtual())) == 1, nil + }) + assert.NilError(t, err) + + // check delete + err = backend.Delete(ctx, &Mapping{ + NameMapping: otherSecretMapping, + Sender: "doesnotexist", + }) + assert.NilError(t, err) + + // wait for event to arrive + err = wait.PollUntilContextTimeout(ctx, time.Millisecond*10, time.Second, true, func(_ context.Context) (bool, error) { + store.m.Lock() + defer store.m.Unlock() + return len(store.mappings) == 0 && len(store.hostToVirtualName) == 0 && len(store.virtualToHostName) == 0 && len(store.referencesTo(podMapping.Virtual())) == 0, nil + }) + assert.NilError(t, err) +} + +func TestGarbageCollectMappings(t *testing.T) { + ctx := context.TODO() + vClient := testingutil.NewFakeClient(scheme.Scheme) + pClient := testingutil.NewFakeClient(scheme.Scheme) + genericStore, err := NewStore(ctx, vClient, pClient, NewMemoryBackend()) + assert.NilError(t, err) + + store, ok := genericStore.(*Store) + assert.Equal(t, true, ok) + + secretMapping := NewRandomMapping(corev1.SchemeGroupVersion.WithKind("Secret")) + podMapping := NewRandomMapping(corev1.SchemeGroupVersion.WithKind("Pod")) + + // record reference + err = store.AddReference(ctx, secretMapping, secretMapping) + assert.NilError(t, err) + err = store.AddReference(ctx, podMapping, podMapping) + assert.NilError(t, err) + assert.Equal(t, 2, len(store.mappings)) + assert.Equal(t, 2, len(store.hostToVirtualName)) + assert.Equal(t, 2, len(store.virtualToHostName)) + assert.Equal(t, 0, len(store.ReferencesTo(ctx, podMapping.Virtual()))) + assert.Equal(t, 0, len(store.ReferencesTo(ctx, secretMapping.Virtual()))) + err = store.AddReference(ctx, secretMapping, podMapping) + assert.NilError(t, err) + assert.Equal(t, 1, len(store.ReferencesTo(ctx, secretMapping.Virtual()))) + + // garbage collect mappings + store.garbageCollectMappings(context.TODO()) + assert.Equal(t, 0, len(store.mappings)) + assert.Equal(t, 0, len(store.hostToVirtualName)) + assert.Equal(t, 0, len(store.virtualToHostName)) + + // record reference + err = store.AddReference(ctx, secretMapping, secretMapping) + assert.NilError(t, err) + err = store.AddReference(ctx, podMapping, podMapping) + assert.NilError(t, err) + err = store.AddReference(ctx, secretMapping, podMapping) + assert.NilError(t, err) + vPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: podMapping.VirtualName.Name, Namespace: podMapping.VirtualName.Namespace}} + err = vClient.Create(ctx, vPod) + assert.NilError(t, err) + assert.Equal(t, 2, len(store.mappings)) + assert.Equal(t, 2, len(store.hostToVirtualName)) + assert.Equal(t, 2, len(store.virtualToHostName)) + assert.Equal(t, 1, len(store.ReferencesTo(ctx, secretMapping.Virtual()))) + + // garbage collect mappings + store.garbageCollectMappings(context.TODO()) + assert.Equal(t, 1, len(store.mappings)) + assert.Equal(t, 2, len(store.hostToVirtualName)) + assert.Equal(t, 2, len(store.virtualToHostName)) + assert.Equal(t, 1, len(store.ReferencesTo(ctx, secretMapping.Virtual()))) + + // make sure we cannot add a new conflicting mapping + conflictingMapping := synccontext.NameMapping{ + GroupVersionKind: secretMapping.GroupVersionKind, + VirtualName: secretMapping.VirtualName, + HostName: types.NamespacedName{Name: "other", Namespace: "other"}, + } + err = store.AddReference(ctx, conflictingMapping, conflictingMapping) + assert.ErrorContains(t, err, "there is already another name mapping") + err = store.AddReference(ctx, conflictingMapping, podMapping) + assert.ErrorContains(t, err, "there is already another name mapping") + + // delete pod + err = vClient.Delete(ctx, vPod) + assert.NilError(t, err) + + // garbage collect mappings + store.garbageCollectMappings(context.TODO()) + assert.Equal(t, 0, len(store.mappings)) + assert.Equal(t, 0, len(store.hostToVirtualName)) + assert.Equal(t, 0, len(store.virtualToHostName)) + assert.Equal(t, 0, len(store.ReferencesTo(ctx, secretMapping.Virtual()))) +} + func TestStore(t *testing.T) { genericStore, err := NewStore(context.TODO(), testingutil.NewFakeClient(scheme.Scheme), testingutil.NewFakeClient(scheme.Scheme), NewMemoryBackend()) assert.NilError(t, err) @@ -36,7 +284,7 @@ func TestStore(t *testing.T) { } // record reference - err = store.RecordReference(baseCtx, synccontext.NameMapping{ + err = store.AddReference(baseCtx, synccontext.NameMapping{ GroupVersionKind: gvk, HostName: hostName, VirtualName: virtualName, @@ -76,7 +324,7 @@ func TestStore(t *testing.T) { HostName: hostName, VirtualName: virtualName, } - err = store.RecordReference(baseCtx, nameMapping, baseMapping) + err = store.AddReference(baseCtx, nameMapping, baseMapping) assert.NilError(t, err) assert.Equal(t, 1, len(store.mappings)) assert.Equal(t, 1, len(store.hostToVirtualName)) @@ -118,7 +366,7 @@ func TestRecordMapping(t *testing.T) { Namespace: "vcluster-namespace", Name: "vcluster-kube-root-ca.crt-x-vcluster", } - err = store.RecordReference(baseCtx, synccontext.NameMapping{ + err = store.AddReference(baseCtx, synccontext.NameMapping{ GroupVersionKind: gvk, VirtualName: virtual, HostName: host2, @@ -129,3 +377,17 @@ func TestRecordMapping(t *testing.T) { assert.NilError(t, err) assert.Equal(t, 0, len(store.mappings)) } + +func NewRandomMapping(gvk schema.GroupVersionKind) synccontext.NameMapping { + return synccontext.NameMapping{ + GroupVersionKind: gvk, + VirtualName: types.NamespacedName{ + Name: random.String(32), + Namespace: random.String(32), + }, + HostName: types.NamespacedName{ + Name: random.String(32), + Namespace: random.String(32), + }, + } +} diff --git a/pkg/syncer/synccontext/mapper.go b/pkg/syncer/synccontext/mapper.go index f192123fab..473e83008d 100644 --- a/pkg/syncer/synccontext/mapper.go +++ b/pkg/syncer/synccontext/mapper.go @@ -49,15 +49,24 @@ type MappingsStore interface { // HasVirtualObject checks if the store has a mapping for the virtual object HasVirtualObject(ctx context.Context, pObj Object) bool - // RecordAndSaveReference records a reference mapping and directly saves it - RecordAndSaveReference(ctx context.Context, nameMapping, belongsTo NameMapping) error + // AddReferenceAndSave adds a reference mapping and directly saves the mapping + AddReferenceAndSave(ctx context.Context, nameMapping, belongsTo NameMapping) error - // RecordReference records a reference mapping - RecordReference(ctx context.Context, nameMapping, belongsTo NameMapping) error + // DeleteReferenceAndSave deletes a reference mapping and directly saves the mapping + DeleteReferenceAndSave(ctx context.Context, nameMapping, belongsTo NameMapping) error + + // AddReference adds a reference mapping + AddReference(ctx context.Context, nameMapping, belongsTo NameMapping) error + + // DeleteReference deletes a reference mapping + DeleteReference(ctx context.Context, nameMapping, belongsTo NameMapping) error // SaveMapping saves the mapping in the backing store SaveMapping(ctx context.Context, mapping NameMapping) error + // DeleteMapping deletes the mapping in the backing store + DeleteMapping(ctx context.Context, mapping NameMapping) error + // ReferencesTo retrieves all known references to this object ReferencesTo(ctx context.Context, vObj Object) []NameMapping