From ee09ca50526a518aef4e2302b862d1fb87365d9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=AF=93?= Date: Mon, 16 Dec 2024 15:19:19 +0800 Subject: [PATCH] optimize: add probe for cluster status --- cmd/controller-manager/app/options/options.go | 8 ++ cmd/controller-manager/app/util.go | 1 + pkg/controllers/context/context.go | 1 + .../federatedcluster/cluster_status_cache.go | 103 ++++++++++++++++++ .../federatedcluster/clusterstatus.go | 5 +- .../federatedcluster/controller.go | 5 + 6 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 pkg/controllers/federatedcluster/cluster_status_cache.go diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index 8b982a01..75c36d98 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -55,6 +55,7 @@ type Options struct { NSAutoPropExcludeRegexp string ClusterJoinTimeout time.Duration + ClusterStatusThreshold time.Duration MemberObjectEnqueueDelay time.Duration MaxPodListers int64 @@ -174,6 +175,13 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string, disabl time.Second*30, "The period of health check for member clusters. The minimum value is "+MinClusterHealthCheckPeriod.String()+".", ) + + flags.DurationVar( + &o.ClusterStatusThreshold, + "cluster-status-threshold", + time.Second*100, + "The threshold of member clusters status change.", + ) } func (o *Options) addKlogFlags(flags *pflag.FlagSet) { diff --git a/cmd/controller-manager/app/util.go b/cmd/controller-manager/app/util.go index 630f6b0d..5d913f93 100644 --- a/cmd/controller-manager/app/util.go +++ b/cmd/controller-manager/app/util.go @@ -185,6 +185,7 @@ func getComponentConfig(opts *options.Options) (*controllercontext.ComponentConf MemberObjectEnqueueDelay: opts.MemberObjectEnqueueDelay, EnableKatalystSupport: opts.EnableKatalystSupport, ClusterHealthCheckPeriod: opts.ClusterHealthCheckPeriod, + ClusterStatusThreshold: opts.ClusterStatusThreshold, } if opts.ClusterHealthCheckPeriod < options.MinClusterHealthCheckPeriod { diff --git a/pkg/controllers/context/context.go b/pkg/controllers/context/context.go index 564f06af..d02ad983 100644 --- a/pkg/controllers/context/context.go +++ b/pkg/controllers/context/context.go @@ -85,4 +85,5 @@ type ComponentConfig struct { ResourceAggregationNodeFilter []labels.Selector EnableKatalystSupport bool ClusterHealthCheckPeriod time.Duration + ClusterStatusThreshold time.Duration } diff --git a/pkg/controllers/federatedcluster/cluster_status_cache.go b/pkg/controllers/federatedcluster/cluster_status_cache.go new file mode 100644 index 00000000..4f4f08d5 --- /dev/null +++ b/pkg/controllers/federatedcluster/cluster_status_cache.go @@ -0,0 +1,103 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This file may have been modified by The KubeAdmiral Authors +("KubeAdmiral Modifications"). All KubeAdmiral Modifications +are Copyright 2023 The KubeAdmiral Authors. +*/ + +package federatedcluster + +import ( + "context" + "sync" + "time" + + "k8s.io/klog/v2" + + fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" +) + +type clusterStatusStore struct { + clusterStatusData sync.Map + clusterStatusThreshold time.Duration +} + +type clusterStatusConditionData struct { + offlineCondition fedcorev1a1.ClusterCondition + readyCondition fedcorev1a1.ClusterCondition + probeTimestamp time.Time +} + +func (c *clusterStatusStore) thresholdAdjustedStatusCondition( + ctx context.Context, + cluster *fedcorev1a1.FederatedCluster, + observedOfflineCondition fedcorev1a1.ClusterCondition, + observedReadyCondition fedcorev1a1.ClusterCondition, +) (fedcorev1a1.ClusterCondition, fedcorev1a1.ClusterCondition) { + logger := klog.FromContext(ctx) + + saved := c.get(cluster.Name) + if saved == nil { + // the cluster is just joined + c.update(cluster.Name, &clusterStatusConditionData{ + offlineCondition: observedOfflineCondition, + readyCondition: observedReadyCondition, + }) + return observedOfflineCondition, observedReadyCondition + } + curOfflineCondition := getClusterCondition(&cluster.Status, fedcorev1a1.ClusterOffline) + curReadyCondition := getClusterCondition(&cluster.Status, fedcorev1a1.ClusterReady) + if curOfflineCondition == nil || curReadyCondition == nil { + return observedOfflineCondition, observedReadyCondition + } + + now := time.Now() + if saved.offlineCondition.Status != observedOfflineCondition.Status || saved.readyCondition.Status != observedReadyCondition.Status { + // condition status changed, record the probe timestamp + saved = &clusterStatusConditionData{ + offlineCondition: observedOfflineCondition, + readyCondition: observedReadyCondition, + probeTimestamp: now, + } + c.update(cluster.Name, saved) + } + + if curOfflineCondition.Status != observedOfflineCondition.Status || curReadyCondition.Status != observedReadyCondition.Status { + // threshold not exceeded, return the old status condition + if now.Before(saved.probeTimestamp.Add(c.clusterStatusThreshold)) { + logger.V(3).WithValues("offline", curOfflineCondition.Status, "ready", curReadyCondition.Status). + Info("Threshold not exceeded, return the old status condition") + return *curOfflineCondition, *curReadyCondition + } + + logger.V(3).WithValues("offline", observedOfflineCondition.Status, "ready", observedReadyCondition.Status). + Info("Cluster status condition changed") + } + + return observedOfflineCondition, observedReadyCondition +} + +func (c *clusterStatusStore) get(cluster string) *clusterStatusConditionData { + condition, ok := c.clusterStatusData.Load(cluster) + if !ok { + return nil + } + return condition.(*clusterStatusConditionData) +} + +func (c *clusterStatusStore) update(cluster string, data *clusterStatusConditionData) { + c.clusterStatusData.Store(cluster, data) +} diff --git a/pkg/controllers/federatedcluster/clusterstatus.go b/pkg/controllers/federatedcluster/clusterstatus.go index dfd12af0..c718dabf 100644 --- a/pkg/controllers/federatedcluster/clusterstatus.go +++ b/pkg/controllers/federatedcluster/clusterstatus.go @@ -298,8 +298,11 @@ func (c *FederatedClusterController) collectIndividualClusterStatus( } offlineCondition := getNewClusterOfflineCondition(offlineStatus, conditionTime) - setClusterCondition(&cluster.Status, &offlineCondition) readyCondition := getNewClusterReadyCondition(readyStatus, readyReason, readyMessage, conditionTime) + + offlineCondition, readyCondition = c.clusterStatusCache.thresholdAdjustedStatusCondition(ctx, cluster, offlineCondition, readyCondition) + + setClusterCondition(&cluster.Status, &offlineCondition) setClusterCondition(&cluster.Status, &readyCondition) if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { diff --git a/pkg/controllers/federatedcluster/controller.go b/pkg/controllers/federatedcluster/controller.go index d65143f3..b3b380a2 100644 --- a/pkg/controllers/federatedcluster/controller.go +++ b/pkg/controllers/federatedcluster/controller.go @@ -85,6 +85,7 @@ type FederatedClusterController struct { clusterHealthCheckConfig *ClusterHealthCheckConfig clusterJoinTimeout time.Duration resourceAggregationNodeFilter []labels.Selector + clusterStatusCache clusterStatusStore lock sync.Mutex clusterConnectionHashes map[string]string @@ -120,6 +121,10 @@ func NewFederatedClusterController( clusterHealthCheckConfig: &ClusterHealthCheckConfig{ Period: componentConfig.ClusterHealthCheckPeriod, }, + clusterStatusCache: clusterStatusStore{ + clusterStatusData: sync.Map{}, + clusterStatusThreshold: componentConfig.ClusterStatusThreshold, + }, lock: sync.Mutex{}, clusterConnectionHashes: map[string]string{},