Skip to content

Commit

Permalink
feat: support dry-run
Browse files Browse the repository at this point in the history
Signed-off-by: shentiecheng <[email protected]>
  • Loading branch information
Poor12 committed Nov 20, 2023
1 parent 6bfa139 commit d3b5175
Show file tree
Hide file tree
Showing 14 changed files with 606 additions and 2 deletions.
4 changes: 4 additions & 0 deletions pkg/controllers/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ const (
TemplateGeneratorMergePatchAnnotation = FederateControllerPrefix + "template-generator-merge-patch"

LatestReplicasetDigestsAnnotation = DefaultPrefix + "latest-replicaset-digests"

// NoSyncAnnotation indicates skip syncing. This annotation should only be added manually and temporarily in particular cases.
// It works only when workloads have not been propagated to member clusters.
NoSyncAnnotation = DefaultPrefix + "no-sync"
)

// PropagatedAnnotationKeys and PropagatedLabelKeys are used to store the keys of annotations and labels that are present
Expand Down
48 changes: 48 additions & 0 deletions pkg/controllers/federate/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -39,6 +40,7 @@ import (
fedclient "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned"
fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/sourcefeedback"
"github.com/kubewharf/kubeadmiral/pkg/stats"
"github.com/kubewharf/kubeadmiral/pkg/stats/metrics"
"github.com/kubewharf/kubeadmiral/pkg/util/eventhandlers"
Expand Down Expand Up @@ -407,6 +409,11 @@ func (c *FederateController) reconcile(ctx context.Context, key workerKey) (stat
)
}

if err := c.updateFeedbackAnnotations(ctx, sourceObject, fedObject, ftc); err != nil {
// do not retry here even on conflict, just reconcile later
return worker.StatusError
}

return worker.StatusAllOK
}

Expand Down Expand Up @@ -562,3 +569,44 @@ func (c *FederateController) handleExistingFederatedObject(

return true, nil
}

func (c *FederateController) updateFeedbackAnnotations(ctx context.Context, sourceObject *unstructured.Unstructured,
fedObject fedcorev1a1.GenericFederatedObject, ftc *fedcorev1a1.FederatedTypeConfig,
) error {
logger := klog.FromContext(ctx)

hasChanged := false
needDelete := false

if fedObject.GetAnnotations()[common.NoSyncAnnotation] == common.AnnotationValueTrue {
if err := sourcefeedback.PopulateSchedulingAnnotation(sourceObject, fedObject, ftc, &hasChanged); err != nil {
return fmt.Errorf("failed to populate scheduling annotation: %w", err)
}
} else {
if _, exists := sourceObject.GetAnnotations()[sourcefeedback.SchedulingAnnotation]; exists {
annotations := sourceObject.GetAnnotations()
delete(annotations, sourcefeedback.SchedulingAnnotation)
sourceObject.SetAnnotations(annotations)
needDelete = true
}
}

if hasChanged || needDelete {
logger.V(3).Info("Updating source object")
resourceClient := c.dynamicClient.Resource(ftc.GetSourceTypeGVR()).Namespace(sourceObject.GetNamespace())

var err error
if ftc.GetSourceType().Group == appsv1.GroupName && ftc.GetSourceType().Name == "deployments" && !needDelete {
// deployment bumps generation if annotations are updated
_, err = resourceClient.UpdateStatus(ctx, sourceObject, metav1.UpdateOptions{})
} else {
_, err = resourceClient.Update(ctx, sourceObject, metav1.UpdateOptions{})
}

if err != nil {
return fmt.Errorf("failed to update source object: %w", err)
}
}

return nil
}
104 changes: 104 additions & 0 deletions pkg/controllers/federate/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package federate

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic/fake"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler"
)

var (
basicDeploymentTemplate = &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"name": "deployment-test",
"namespace": "default",
"labels": map[string]interface{}{
"test": "test",
},
"generation": int64(1),
"annotations": map[string]interface{}{
"kubeadmiral.io/no-sync": "true",
"kubeadmiral.io/scheduling": "{\"generation\":1,\"fedGeneration\":1,\"placement\":[\"cluster1\"]}",
},
},
"spec": map[string]interface{}{
"replicas": int64(1),
"template": map[string]interface{}{
"spec": map[string]interface{}{
"containers": []interface{}{},
},
},
},
},
}
)

func generateFedObj(workload *unstructured.Unstructured) *fedcorev1a1.FederatedObject {
rawTargetTemplate, _ := workload.MarshalJSON()
return &fedcorev1a1.FederatedObject{
ObjectMeta: metav1.ObjectMeta{
Generation: 1,
},
Spec: fedcorev1a1.GenericFederatedObjectSpec{
Template: apiextensionsv1.JSON{Raw: rawTargetTemplate},
},
}
}

func Test_FederateController_updateFeedbackAnnotations(t *testing.T) {
sourceObj := basicDeploymentTemplate
fedObject := generateFedObj(basicDeploymentTemplate)
fedObject.GetSpec().Placements = []fedcorev1a1.PlacementWithController{
{
Controller: scheduler.PrefixedGlobalSchedulerName,
Placement: []fedcorev1a1.ClusterReference{
{
Cluster: "cluster1",
},
},
},
}
ftc := &fedcorev1a1.FederatedTypeConfig{
Spec: fedcorev1a1.FederatedTypeConfigSpec{
SourceType: fedcorev1a1.APIResource{
Group: "apps",
Version: "v1",
Kind: "Deployment",
PluralName: "deployments",
Scope: "Namespaced",
},
PathDefinition: fedcorev1a1.PathDefinition{
ReplicasSpec: "spec.replicas",
},
},
}

scheme := runtime.NewScheme()
err := corev1.AddToScheme(scheme)
assert.Equal(t, nil, err)
err = appsv1.AddToScheme(scheme)
assert.Equal(t, nil, err)

dynamicClient := fake.NewSimpleDynamicClient(scheme, sourceObj)
f := &FederateController{dynamicClient: dynamicClient}
err = f.updateFeedbackAnnotations(context.Background(), sourceObj, fedObject, ftc)
assert.Equal(t, nil, err)
annotations := sourceObj.GetAnnotations()
delete(annotations, "kubeadmiral.io/no-sync")
sourceObj.SetAnnotations(annotations)
err = f.updateFeedbackAnnotations(context.Background(), sourceObj, fedObject, ftc)
assert.Equal(t, nil, err)
}
3 changes: 3 additions & 0 deletions pkg/controllers/federate/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/kubewharf/kubeadmiral/pkg/controllers/nsautoprop"
"github.com/kubewharf/kubeadmiral/pkg/controllers/override"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/sourcefeedback"
"github.com/kubewharf/kubeadmiral/pkg/util/adoption"
annotationutil "github.com/kubewharf/kubeadmiral/pkg/util/annotation"
"github.com/kubewharf/kubeadmiral/pkg/util/naming"
Expand Down Expand Up @@ -265,11 +266,13 @@ var (
scheduler.FollowsObjectAnnotation,
common.FollowersAnnotation,
common.DisableFollowingAnnotation,
common.NoSyncAnnotation,
)

// List of annotations that should be ignored on the source object
ignoredAnnotationSet = sets.New(
common.LatestReplicasetDigestsAnnotation,
sourcefeedback.SchedulingAnnotation,
)

federatedLabelSet = sets.New[string](
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/scheduler/schedulingtriggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ var knownSchedulingAnnotations = sets.New(
AffinityAnnotations,
MaxClustersAnnotations,
FollowsObjectAnnotation,
common.NoSyncAnnotation,
)

func getSchedulingAnnotationsHash(fedObject fedcorev1a1.GenericFederatedObject) (string, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/scheduler/schedulingunit.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func schedulingUnitForFedObject(
desiredReplicasOption = value
}

currentReplicas, err := getCurrentReplicasFromObject(typeConfig, fedObject)
currentReplicas, err := GetCurrentReplicasFromObject(typeConfig, fedObject)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -166,7 +166,7 @@ func schedulingUnitForFedObject(
return schedulingUnit, nil
}

func getCurrentReplicasFromObject(
func GetCurrentReplicasFromObject(
ftc *fedcorev1a1.FederatedTypeConfig,
fedObject fedcorev1a1.GenericFederatedObject,
) (map[string]*int64, error) {
Expand Down
15 changes: 15 additions & 0 deletions pkg/controllers/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,12 @@ func (s *SyncController) reconcile(ctx context.Context, federatedName common.Qua
fedResource.RecordError("EnsureFinalizerError", errors.Wrap(err, "Failed to ensure finalizer"))
return worker.StatusError
}

if skipSync(fedResource.Object()) {
fedResource.RecordEvent("SyncSkipped", "Skip Syncing for %s", fedResource.FederatedName())
return worker.StatusAllOK
}

clustersToSync, selectedClusters, err := s.prepareToSync(ctx, fedResource)
if err != nil {
fedResource.RecordError("PrepareToSyncError", errors.Wrap(err, "Failed to prepare to sync"))
Expand Down Expand Up @@ -1294,3 +1300,12 @@ func convertSyncMapToMap(syncMap *sync.Map) map[interface{}]interface{} {

return normalMap
}

func skipSync(federatedObject fedcorev1a1.GenericFederatedObject) bool {
// if workloads have been propagated to member clusters,
// sync is always not skipped.
if len(federatedObject.GetStatus().Clusters) > 0 {
return false
}
return federatedObject.GetAnnotations()[common.NoSyncAnnotation] == common.AnnotationValueTrue
}
29 changes: 29 additions & 0 deletions pkg/controllers/sync/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package sync

import (
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
)

func Test_skipSync(t *testing.T) {
federatedObject := &fedcorev1a1.FederatedObject{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{common.NoSyncAnnotation: common.AnnotationValueTrue},
},
}
assert.True(t, skipSync(federatedObject))
federatedObject.GetAnnotations()[common.NoSyncAnnotation] = common.AnnotationValueFalse
assert.False(t, skipSync(federatedObject))
federatedObject.GetAnnotations()[common.NoSyncAnnotation] = common.AnnotationValueTrue
federatedObject.GetStatus().Clusters = []fedcorev1a1.PropagationStatus{
{
Cluster: "cluster1",
},
}
assert.False(t, skipSync(federatedObject))
}
83 changes: 83 additions & 0 deletions pkg/controllers/util/sourcefeedback/scheduling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright 2023 The KubeAdmiral Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sourcefeedback

import (
"sort"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/utils/pointer"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler"
"github.com/kubewharf/kubeadmiral/pkg/util/meta"
)

var SchedulingAnnotation = common.DefaultPrefix + "scheduling"

type Scheduling struct {
// Generation is the generation of the source object
// observed in the federated object when this placement is sampled.
// This value should not be null unless in the condition
// where the federated object is manually created by another controller.
Generation *int64 `json:"generation"`

// FederatedGeneration is the generation of the federated object
// observed when this placement is sampled.
FederatedGeneration int64 `json:"fedGeneration"`

// Placement contains a list of FederatedCluster object names.
Placement []string `json:"placement,omitempty"`

// PlacementWithReplicas contains a list of FederatedCluster object names and replicas on them.
PlacementWithReplicas map[string]*int64 `json:"placementWithReplicas,omitempty"`
}

func PopulateSchedulingAnnotation(sourceObject *unstructured.Unstructured, fedObject fedcorev1a1.GenericFederatedObject,
ftc *fedcorev1a1.FederatedTypeConfig, hasChanged *bool,
) (err error) {
scheduling := Scheduling{}

srcMeta, err := meta.GetSourceObjectMeta(fedObject)
if err != nil {
return err
}

scheduling.Generation = pointer.Int64(srcMeta.GetGeneration())
scheduling.FederatedGeneration = fedObject.GetGeneration()

overrides := fedObject.GetSpec().GetControllerOverrides(scheduler.PrefixedGlobalSchedulerName)
if len(overrides) == 0 {
clusterNames := fedObject.GetSpec().GetPlacementUnion()
if len(clusterNames) > 0 {
for clusterName := range clusterNames {
scheduling.Placement = append(scheduling.Placement, clusterName)
}
sort.Strings(scheduling.Placement)
}
} else {
placementWithReplicas, err := scheduler.GetCurrentReplicasFromObject(ftc, fedObject)
if err != nil {
return err
}
scheduling.PlacementWithReplicas = placementWithReplicas
}

setAnnotation(sourceObject, SchedulingAnnotation, &scheduling, hasChanged)
return nil
}
Loading

0 comments on commit d3b5175

Please sign in to comment.