Skip to content

Commit

Permalink
Merge pull request #2071 from FabianKramm/main
Browse files Browse the repository at this point in the history
fix: store watches & delete references
  • Loading branch information
FabianKramm authored Aug 16, 2024
2 parents 5dc220a + 500f418 commit 7f688cc
Show file tree
Hide file tree
Showing 10 changed files with 439 additions and 59 deletions.
2 changes: 1 addition & 1 deletion pkg/controllers/resources/csistoragecapacities/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *csiStorageCapacitiesMapper) Migrate(ctx *synccontext.RegisterContext, m
HostName: pName,
}

err = ctx.Mappings.Store().RecordAndSaveReference(ctx, nameMapping, nameMapping)
err = ctx.Mappings.Store().AddReferenceAndSave(ctx, nameMapping, nameMapping)
if err != nil {
return fmt.Errorf("error saving reference in store: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mappings/generic/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (n *mapper) Migrate(ctx *synccontext.RegisterContext, mapper synccontext.Ma
HostName: pName,
}

err = ctx.Mappings.Store().RecordAndSaveReference(ctx, nameMapping, nameMapping)
err = ctx.Mappings.Store().AddReferenceAndSave(ctx, nameMapping, nameMapping)
if err != nil {
return fmt.Errorf("error saving reference in store: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mappings/generic/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func RecordMapping(ctx *synccontext.SyncContext, pName, vName types.NamespacedNa
}

// record the reference
err := ctx.Mappings.Store().RecordReference(ctx, synccontext.NameMapping{
err := ctx.Mappings.Store().AddReference(ctx, synccontext.NameMapping{
GroupVersionKind: gvk,

HostName: pName,
Expand Down
4 changes: 2 additions & 2 deletions pkg/mappings/generic/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestRecorder(t *testing.T) {
VirtualName: vTest,
HostName: pTestOther,
}
err = mappingsStore.RecordReference(syncContext.Context, conflictingMapping, conflictingMapping)
err = mappingsStore.AddReference(syncContext.Context, conflictingMapping, conflictingMapping)
assert.NilError(t, err)

// check that mapping is empty
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestRecorder(t *testing.T) {
HostName: retTest,
VirtualName: pTestOther,
}
err = mappingsStore.RecordReference(syncContext.Context, conflictingMapping, conflictingMapping)
err = mappingsStore.AddReference(syncContext.Context, conflictingMapping, conflictingMapping)
assert.ErrorContains(t, err, "there is already another name mapping")

// check if managed 1
Expand Down
2 changes: 1 addition & 1 deletion pkg/mappings/resources/configmaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *configMapsMapper) Migrate(ctx *synccontext.RegisterContext, mapper sync
for _, configMap := range configMapsFromPod(item) {
pName := mapper.VirtualToHost(ctx.ToSyncContext("migrate-pod"), configMap, nil)
if pName.Name != "" {
err = ctx.Mappings.Store().RecordAndSaveReference(ctx, synccontext.NameMapping{
err = ctx.Mappings.Store().AddReferenceAndSave(ctx, synccontext.NameMapping{
GroupVersionKind: mappings.ConfigMaps(),
VirtualName: configMap,
HostName: pName,
Expand Down
4 changes: 2 additions & 2 deletions pkg/mappings/resources/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *secretsMapper) Migrate(ctx *synccontext.RegisterContext, mapper synccon
for _, secret := range secretNamesFromPod(syncContext, item) {
pName := mapper.VirtualToHost(syncContext, secret, nil)
if pName.Name != "" {
err = ctx.Mappings.Store().RecordAndSaveReference(ctx, synccontext.NameMapping{
err = ctx.Mappings.Store().AddReferenceAndSave(ctx, synccontext.NameMapping{
GroupVersionKind: mappings.Secrets(),
VirtualName: secret,
HostName: pName,
Expand Down Expand Up @@ -90,7 +90,7 @@ func (s *secretsMapper) Migrate(ctx *synccontext.RegisterContext, mapper synccon
for _, secret := range secretNamesFromIngress(syncContext, item) {
pName := mapper.VirtualToHost(syncContext, secret, nil)
if pName.Name != "" {
err = ctx.Mappings.Store().RecordAndSaveReference(ctx, synccontext.NameMapping{
err = ctx.Mappings.Store().AddReferenceAndSave(ctx, synccontext.NameMapping{
GroupVersionKind: mappings.Secrets(),
VirtualName: secret,
HostName: pName,
Expand Down
2 changes: 2 additions & 0 deletions pkg/mappings/store/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import "github.com/loft-sh/vcluster/pkg/syncer/synccontext"
type Mapping struct {
synccontext.NameMapping `json:",inline"`

Sender string `json:"sender,omitempty"`

References []synccontext.NameMapping `json:"references,omitempty"`

changed bool `json:"-"`
Expand Down
195 changes: 151 additions & 44 deletions pkg/mappings/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/loft-sh/vcluster/pkg/scheme"
"github.com/loft-sh/vcluster/pkg/syncer/synccontext"
kerrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -25,6 +26,8 @@ func NewStore(ctx context.Context, cachedVirtualClient, cachedHostClient client.
store := &Store{
backend: backend,

sender: uuid.NewString(),

cachedVirtualClient: cachedVirtualClient,
cachedHostClient: cachedHostClient,

Expand All @@ -48,6 +51,8 @@ func NewStore(ctx context.Context, cachedVirtualClient, cachedHostClient client.
type Store struct {
m sync.RWMutex

sender string

cachedVirtualClient client.Client
cachedHostClient client.Client

Expand Down Expand Up @@ -106,60 +111,75 @@ func (s *Store) garbageCollectMappings(ctx context.Context) {
}

func (s *Store) garbageCollectMapping(ctx context.Context, mapping *Mapping) error {
// build the object we can query
obj, err := scheme.Scheme.New(mapping.GroupVersionKind)
// check if object exists
exists, err := s.objectExists(ctx, mapping.NameMapping)
if err != nil {
if !runtime.IsNotRegisteredError(err) {
return fmt.Errorf("create object: %w", err)
}

obj = &unstructured.Unstructured{}
return err
} else if exists {
return nil
}

uList, ok := obj.(*unstructured.Unstructured)
if ok {
uList.SetKind(mapping.GroupVersionKind.Kind)
uList.SetAPIVersion(mapping.GroupVersionKind.GroupVersion().String())
// delete the mapping
err = s.deleteMapping(ctx, mapping)
if err != nil {
return err
}

// check if virtual object exists
virtualExists := true
err = s.cachedVirtualClient.Get(ctx, types.NamespacedName{Name: mapping.VirtualName.Name, Namespace: mapping.VirtualName.Namespace}, obj.DeepCopyObject().(client.Object))
if err != nil {
if !kerrors.IsNotFound(err) {
// TODO: filter out other allowed errors here could be Forbidden, Type not found etc.
klog.FromContext(ctx).Info("Error retrieving virtual object", "virtualObject", mapping.Virtual().String())
}
klog.FromContext(ctx).Info("Remove mapping as both virtual and host were not found", "mapping", mapping.String())
return nil
}

func (s *Store) deleteMapping(ctx context.Context, mapping *Mapping) error {
// set sender
mapping.Sender = s.sender

virtualExists = false
// remove mapping from backend
err := s.backend.Delete(ctx, mapping)
if err != nil {
return fmt.Errorf("remove mapping from backend: %w", err)
}

// check if host object exists
hostExists := true
err = s.cachedVirtualClient.Get(ctx, types.NamespacedName{Name: mapping.HostName.Name, Namespace: mapping.HostName.Namespace}, obj.DeepCopyObject().(client.Object))
s.removeMapping(mapping)
return nil
}

func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMapping) (bool, error) {
// build the object we can query
obj, err := scheme.Scheme.New(nameMapping.GroupVersionKind)
if err != nil {
if !kerrors.IsNotFound(err) {
// TODO: filter out other allowed errors here could be Forbidden, Type not found etc.
klog.FromContext(ctx).Info("Error retrieving host object", "hostObject", mapping.Host().String())
if !runtime.IsNotRegisteredError(err) {
return false, fmt.Errorf("create object: %w", err)
}

hostExists = false
obj = &unstructured.Unstructured{}
}

// remove mapping if both objects are not found anymore
if virtualExists || hostExists {
return nil
// set kind & apiVersion if unstructured
uObject, ok := obj.(*unstructured.Unstructured)
if ok {
uObject.SetKind(nameMapping.GroupVersionKind.Kind)
uObject.SetAPIVersion(nameMapping.GroupVersionKind.GroupVersion().String())
}

// remove mapping from backend
err = s.backend.Delete(ctx, mapping)
if err != nil {
return fmt.Errorf("remove mapping from backend: %w", err)
// check if virtual object exists
err = s.cachedVirtualClient.Get(ctx, nameMapping.VirtualName, obj.DeepCopyObject().(client.Object))
if err == nil {
return true, nil
} else if !kerrors.IsNotFound(err) {
// TODO: filter out other allowed errors here could be Forbidden, Type not found etc.
klog.FromContext(ctx).Info("Error retrieving virtual object", "virtualObject", nameMapping.Virtual().String())
}

klog.FromContext(ctx).Info("Remove mapping as both virtual and host were not found", "mapping", mapping.String())
s.removeMapping(mapping)
return nil
// check if host object exists
err = s.cachedHostClient.Get(ctx, nameMapping.HostName, obj.DeepCopyObject().(client.Object))
if err == nil {
return true, nil
} else if !kerrors.IsNotFound(err) {
// TODO: filter out other allowed errors here could be Forbidden, Type not found etc.
klog.FromContext(ctx).Info("Error retrieving host object", "hostObject", nameMapping.Host().String())
}

return false, nil
}

func (s *Store) start(ctx context.Context) error {
Expand Down Expand Up @@ -204,6 +224,11 @@ func (s *Store) handleEvent(ctx context.Context, watchEvent BackendWatchResponse
}

for _, event := range watchEvent.Events {
// ignore events sent by us
if event.Mapping.Sender == s.sender {
continue
}

klog.FromContext(ctx).V(1).Info("mapping store received event", "type", event.Type, "mapping", event.Mapping.String())

// remove mapping in any case
Expand Down Expand Up @@ -245,16 +270,66 @@ func (s *Store) VirtualToHostName(_ context.Context, vObj synccontext.Object) (t
return pObjLookup.Object.NamespacedName, ok
}

func (s *Store) RecordAndSaveReference(ctx context.Context, nameMapping, belongsTo synccontext.NameMapping) error {
err := s.RecordReference(ctx, nameMapping, belongsTo)
func (s *Store) DeleteReferenceAndSave(ctx context.Context, nameMapping, belongsTo synccontext.NameMapping) error {
err := s.DeleteReference(ctx, nameMapping, belongsTo)
if err != nil {
return err
}

return s.SaveMapping(ctx, belongsTo)
}

func (s *Store) DeleteReference(ctx context.Context, nameMapping, belongsTo synccontext.NameMapping) error {
// we don't record incomplete mappings
if nameMapping.Host().Empty() || nameMapping.Virtual().Empty() {
return nil
}

s.m.Lock()
defer s.m.Unlock()

// check if there is already a mapping
mapping, ok := s.findMapping(belongsTo)
if !ok {
return nil
}

// check if reference already exists
newReferences := make([]synccontext.NameMapping, 0, len(mapping.References)-1)
for _, reference := range mapping.References {
if reference.Equals(nameMapping) {
continue
}

newReferences = append(newReferences, reference)
}

// check if we found the reference
if len(newReferences) == len(mapping.References) {
return nil
}

// signal mapping was changed
mapping.References = newReferences
mapping.changed = true
klog.FromContext(ctx).Info("Delete mapping reference", "host", nameMapping.Host().String(), "virtual", nameMapping.Virtual().String(), "owner", mapping.Virtual().String())

// add to lookup maps
s.removeNameFromMaps(mapping, nameMapping.Virtual(), nameMapping.Host())
dispatchAll(s.watches[nameMapping.GroupVersionKind], nameMapping)
return nil
}

func (s *Store) AddReferenceAndSave(ctx context.Context, nameMapping, belongsTo synccontext.NameMapping) error {
err := s.AddReference(ctx, nameMapping, belongsTo)
if err != nil {
return err
}

return s.SaveMapping(ctx, belongsTo)
}

func (s *Store) RecordReference(ctx context.Context, nameMapping, belongsTo synccontext.NameMapping) error {
func (s *Store) AddReference(ctx context.Context, nameMapping, belongsTo synccontext.NameMapping) error {
// we don't record incomplete mappings
if nameMapping.Host().Empty() || nameMapping.Virtual().Empty() {
return nil
Expand Down Expand Up @@ -290,7 +365,7 @@ func (s *Store) RecordReference(ctx context.Context, nameMapping, belongsTo sync

// add mapping
mapping.changed = true
klog.FromContext(ctx).Info("Add name mapping", "host", nameMapping.Host().String(), "virtual", nameMapping.Virtual().String(), "owner", mapping.Virtual().String())
klog.FromContext(ctx).Info("Add mapping reference", "host", nameMapping.Host().String(), "virtual", nameMapping.Virtual().String(), "owner", mapping.Virtual().String())
mapping.References = append(mapping.References, nameMapping)

// add to lookup maps
Expand All @@ -316,6 +391,9 @@ func (s *Store) SaveMapping(ctx context.Context, nameMapping synccontext.NameMap
return nil
}

// set sender
mapping.Sender = s.sender

// save mapping
klog.FromContext(ctx).Info("Save object mappings in store", "mapping", mapping.String())
err := s.backend.Save(ctx, mapping)
Expand All @@ -327,15 +405,45 @@ func (s *Store) SaveMapping(ctx context.Context, nameMapping synccontext.NameMap
return nil
}

func (s *Store) ReferencesTo(ctx context.Context, vObj synccontext.Object) []synccontext.NameMapping {
func (s *Store) DeleteMapping(ctx context.Context, nameMapping synccontext.NameMapping) error {
// we ignore empty mappings here
if vObj.Empty() {
if nameMapping.Empty() {
return nil
}

s.m.Lock()
defer s.m.Unlock()

// check if there is already a mapping
mapping, ok := s.findMapping(nameMapping)
if !ok {
return nil
}

// delete the mapping
err := s.deleteMapping(ctx, mapping)
if err != nil {
return err
}

klog.FromContext(ctx).Info("Remove object mappings in store", "mapping", mapping.String())
return nil
}

func (s *Store) ReferencesTo(ctx context.Context, vObj synccontext.Object) []synccontext.NameMapping {
s.m.Lock()
defer s.m.Unlock()

retReferences := s.referencesTo(vObj)
klog.FromContext(ctx).V(1).Info("Found references for object", "object", vObj.String(), "references", len(retReferences))
return retReferences
}

func (s *Store) referencesTo(vObj synccontext.Object) []synccontext.NameMapping {
if vObj.Empty() {
return nil
}

hostNameLookup, ok := s.virtualToHostName[vObj]
if !ok {
return nil
Expand All @@ -356,7 +464,6 @@ func (s *Store) ReferencesTo(ctx context.Context, vObj synccontext.Object) []syn
retReferences = append(retReferences, reference.NameMapping)
}

klog.FromContext(ctx).V(1).Info("Found references for object", "object", vObj.String(), "references", len(retReferences))
return retReferences
}

Expand Down
Loading

0 comments on commit 7f688cc

Please sign in to comment.