From 958d7983ac43cd7c7495897b9abe32d553366f8c Mon Sep 17 00:00:00 2001 From: Matei David Date: Thu, 29 Jun 2023 11:27:40 +0100 Subject: [PATCH] Add leader election to the service-mirror controller (#11046) In order to support an HA mode for the service-mirror component, some form of synchronization should be used to coordinate between replicas of the service-mirror controller. Although in practice most of the updates done by the replicas are idempotent (and have benign effects on correctness), there are some downsides, such as: resource usage implications from setting-up multiple watches, log pollution, errors associated with writes on resources that out of date, and increased difficulty in debugging. This change adds coordination between the replicas through leader election. To achieve leader election, client-go's `coordination` package is used. The change refactors the existing code; the previous nested loops now reside in a closure (to capture the necessary configuration), and the closure is run when a leader is elected. Leader election functions as part of a loop: a lease resource is created (if it does not exist), and the controller blocks until it has acquired the lease. The loop is terminated only on shutdown from an interrupt signal. If the lease is lost, it is released, watchers are cleaned-up, and the controller returns to blocking until it acquires the lease once again. Shutdown logic has been changed to rely on context cancellation propagation so that the watchers may be ended either by the leader elector (when claim is lost) or by the main routine when an interrupt is handled. --------- Signed-off-by: Matei David Co-authored-by: Alejandro Pedraza --- .../templates/service-mirror.yaml | 3 + multicluster/cmd/service-mirror/main.go | 239 +++++++++++++----- .../testdata/serivce_mirror_default.golden | 3 + multicluster/cmd/unlink.go | 4 +- 4 files changed, 180 insertions(+), 69 deletions(-) diff --git a/multicluster/charts/linkerd-multicluster-link/templates/service-mirror.yaml b/multicluster/charts/linkerd-multicluster-link/templates/service-mirror.yaml index 33f1c0707e643..1e99a5522c269 100644 --- a/multicluster/charts/linkerd-multicluster-link/templates/service-mirror.yaml +++ b/multicluster/charts/linkerd-multicluster-link/templates/service-mirror.yaml @@ -52,6 +52,9 @@ rules: - apiGroups: ["multicluster.linkerd.io"] resources: ["links"] verbs: ["list", "get", "watch"] + - apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["create", "get", "update", "patch"] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/multicluster/cmd/service-mirror/main.go b/multicluster/cmd/service-mirror/main.go index b7de53adbf8cd..5194dcff785bb 100644 --- a/multicluster/cmd/service-mirror/main.go +++ b/multicluster/cmd/service-mirror/main.go @@ -21,9 +21,21 @@ import ( dynamic "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" ) -const linkWatchRestartAfter = 10 * time.Second +const ( + linkWatchRestartAfter = 10 * time.Second + // Duration of the lease + LEASE_DURATION = 30 * time.Second + // Deadline for the leader to refresh its lease. Defaults to the same value + // used by core controllers + LEASE_RENEW_DEADLINE = 10 * time.Second + // Duration leader elector clients should wait between action re-tries. + // Defaults to the same value used by core controllers + LEASE_RETRY_PERIOD = 2 * time.Second +) var ( clusterWatcher *servicemirror.RemoteClusterServiceWatcher @@ -55,8 +67,17 @@ func Main(args []string) { } }() + rootCtx, cancel := context.WithCancel(context.Background()) + stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + go func() { + <-stop + log.Info("Received shutdown signal") + // Cancel root context. Cancellation will be propagated to all other + // contexts that are children of the root context. + cancel() + }() // We create two different kubernetes API clients for the local cluster: // k8sAPI is used as a dynamic client for unstructured access to Link custom @@ -69,10 +90,8 @@ func Main(args []string) { if err != nil { log.Fatalf("Failed to initialize K8s API: %s", err) } - - ctx := context.Background() controllerK8sAPI, err := controllerK8s.InitializeAPI( - ctx, + rootCtx, *kubeConfigPath, false, controllerK8s.NS, @@ -84,79 +103,167 @@ func Main(args []string) { } linkClient := k8sAPI.DynamicClient.Resource(multicluster.LinkGVR).Namespace(*namespace) - metrics := servicemirror.NewProbeMetricVecs() - controllerK8sAPI.Sync(nil) ready = true - -main: - for { - // Start link watch - linkWatch, err := linkClient.Watch(ctx, metav1.ListOptions{}) - if err != nil { - log.Fatalf("Failed to watch Link %s: %s", linkName, err) - } - results := linkWatch.ResultChan() - - // Each time the link resource is updated, reload the config and restart the - // cluster watcher. + run := func(ctx context.Context) { + main: for { - select { - case <-stop: - break main - case event, ok := <-results: - if !ok { - log.Info("Link watch terminated; restarting watch") - continue main - } - switch obj := event.Object.(type) { - case *dynamic.Unstructured: - if obj.GetName() == linkName { - switch event.Type { - case watch.Added, watch.Modified: - link, err := multicluster.NewLink(*obj) - if err != nil { - log.Errorf("Failed to parse link %s: %s", linkName, err) - continue - } - log.Infof("Got updated link %s: %+v", linkName, link) - creds, err := loadCredentials(ctx, link, *namespace, k8sAPI) - if err != nil { - log.Errorf("Failed to load remote cluster credentials: %s", err) - } - err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc) - if err != nil { - // failed to restart cluster watcher; give a bit of slack - // and restart the link watch to give it another try - log.Error(err) - time.Sleep(linkWatchRestartAfter) - linkWatch.Stop() - } - case watch.Deleted: - log.Infof("Link %s deleted", linkName) - if clusterWatcher != nil { - clusterWatcher.Stop(false) - clusterWatcher = nil - } - if probeWorker != nil { - probeWorker.Stop() - probeWorker = nil + // Start link watch + linkWatch, err := linkClient.Watch(ctx, metav1.ListOptions{}) + if err != nil { + log.Fatalf("Failed to watch Link %s: %s", linkName, err) + } + results := linkWatch.ResultChan() + + // Each time the link resource is updated, reload the config and restart the + // cluster watcher. + for { + select { + // ctx.Done() is a one-shot channel that will be closed once + // the context has been cancelled. Receiving from a closed + // channel yields the value immediately. + case <-ctx.Done(): + // The channel will be closed by the leader elector when a + // lease is lost, or by a background task handling SIGTERM. + // Before terminating the loop, stop the workers and set + // them to nil to release memory. + cleanupWorkers() + return + case event, ok := <-results: + if !ok { + log.Info("Link watch terminated; restarting watch") + continue main + } + switch obj := event.Object.(type) { + case *dynamic.Unstructured: + if obj.GetName() == linkName { + switch event.Type { + case watch.Added, watch.Modified: + link, err := multicluster.NewLink(*obj) + if err != nil { + log.Errorf("Failed to parse link %s: %s", linkName, err) + continue + } + log.Infof("Got updated link %s: %+v", linkName, link) + creds, err := loadCredentials(ctx, link, *namespace, k8sAPI) + if err != nil { + log.Errorf("Failed to load remote cluster credentials: %s", err) + } + err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc) + if err != nil { + // failed to restart cluster watcher; give a bit of slack + // and restart the link watch to give it another try + log.Error(err) + time.Sleep(linkWatchRestartAfter) + linkWatch.Stop() + } + case watch.Deleted: + log.Infof("Link %s deleted", linkName) + cleanupWorkers() + default: + log.Infof("Ignoring event type %s", event.Type) } - default: - log.Infof("Ignoring event type %s", event.Type) } + default: + log.Errorf("Unknown object type detected: %+v", obj) } - default: - log.Errorf("Unknown object type detected: %+v", obj) } } } } + + hostname, found := os.LookupEnv("HOSTNAME") + if !found { + log.Fatal("Failed to fetch 'HOSTNAME' environment variable") + } + + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("service-mirror-write-%s", linkName), + Namespace: *namespace, + Labels: map[string]string{ + "component": "linkerd-service-mirror", + "mirror.linkerd.io/cluster-name": linkName, + }, + }, + Client: k8sAPI.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: hostname, + }, + } + +election: + for { + // RunOrDie will block until the lease is lost. + // + // When a lease is acquired, the OnStartedLeading callback will be + // triggered, and a main watcher loop will be established to watch Link + // resources. + // + // When the lease is lost, all watchers will be cleaned-up and we will + // loop then attempt to re-acquire the lease. + leaderelection.RunOrDie(rootCtx, leaderelection.LeaderElectionConfig{ + // When runtime context is cancelled, lock will be released. Implies any + // code guarded by the lease _must_ finish before cancelling. + ReleaseOnCancel: true, + Lock: lock, + LeaseDuration: LEASE_DURATION, + RenewDeadline: LEASE_RENEW_DEADLINE, + RetryPeriod: LEASE_RETRY_PERIOD, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + // When a lease is lost, RunOrDie will cancel the context + // passed into the OnStartedLeading callback. This will in + // turn cause us to cancel the work in the run() function, + // effectively terminating and cleaning-up the watches. + log.Info("Starting controller loop") + run(ctx) + }, + OnStoppedLeading: func() { + log.Infof("%s released lease", hostname) + }, + OnNewLeader: func(identity string) { + if identity == hostname { + log.Infof("%s acquired lease", hostname) + } + }, + }, + }) + + select { + // If the lease has been lost, and we have received a shutdown signal, + // break the loop and gracefully exit. We can guarantee at this point + // resources have been released. + case <-rootCtx.Done(): + break election + // If the lease has been lost, loop and attempt to re-acquire it. + default: + + } + } log.Info("Shutting down") } +// cleanupWorkers is a utility function that checks whether the worker pointers +// (clusterWatcher and probeWorker) are instantiated, and if they are, stops +// their execution and sets the pointers to a nil value so that memory may be +// garbage collected. +func cleanupWorkers() { + if clusterWatcher != nil { + // release, but do not clean-up services created + // the `unlink` command will take care of that + clusterWatcher.Stop(false) + clusterWatcher = nil + } + + if probeWorker != nil { + probeWorker.Stop() + probeWorker = nil + } +} + func loadCredentials(ctx context.Context, link multicluster.Link, namespace string, k8sAPI *k8s.KubernetesAPI) ([]byte, error) { // Load the credentials secret secret, err := k8sAPI.Interface.CoreV1().Secrets(namespace).Get(ctx, link.ClusterCredentialsSecret, metav1.GetOptions{}) @@ -177,12 +284,8 @@ func restartClusterWatcher( metrics servicemirror.ProbeMetricVecs, enableHeadlessSvc bool, ) error { - if clusterWatcher != nil { - clusterWatcher.Stop(false) - } - if probeWorker != nil { - probeWorker.Stop() - } + + cleanupWorkers() // Start probe worker workerMetrics, err := metrics.NewWorkerMetrics(link.TargetClusterName) diff --git a/multicluster/cmd/testdata/serivce_mirror_default.golden b/multicluster/cmd/testdata/serivce_mirror_default.golden index 8f8152e7fc3de..99ad7092b4fc5 100644 --- a/multicluster/cmd/testdata/serivce_mirror_default.golden +++ b/multicluster/cmd/testdata/serivce_mirror_default.golden @@ -49,6 +49,9 @@ rules: - apiGroups: ["multicluster.linkerd.io"] resources: ["links"] verbs: ["list", "get", "watch"] + - apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["create", "get", "update", "patch"] --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/multicluster/cmd/unlink.go b/multicluster/cmd/unlink.go index 16a346180b644..e46af19b84802 100644 --- a/multicluster/cmd/unlink.go +++ b/multicluster/cmd/unlink.go @@ -13,6 +13,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" appsv1 "k8s.io/api/apps/v1" + coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" rbac "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -67,10 +68,11 @@ func newUnlinkCommand() *cobra.Command { roleBinding := resource.NewNamespaced(rbac.SchemeGroupVersion.String(), "RoleBinding", fmt.Sprintf("linkerd-service-mirror-read-remote-creds-%s", opts.clusterName), opts.namespace) serviceAccount := resource.NewNamespaced(corev1.SchemeGroupVersion.String(), "ServiceAccount", fmt.Sprintf("linkerd-service-mirror-%s", opts.clusterName), opts.namespace) serviceMirror := resource.NewNamespaced(appsv1.SchemeGroupVersion.String(), "Deployment", fmt.Sprintf("linkerd-service-mirror-%s", opts.clusterName), opts.namespace) + lease := resource.NewNamespaced(coordinationv1.SchemeGroupVersion.String(), "Lease", fmt.Sprintf("service-mirror-write-%s", opts.clusterName), opts.namespace) resources := []resource.Kubernetes{ secret, gatewayMirror, link, clusterRole, clusterRoleBinding, - role, roleBinding, serviceAccount, serviceMirror, + role, roleBinding, serviceAccount, serviceMirror, lease, } selector := fmt.Sprintf("%s=%s,%s=%s",