Skip to content

Commit

Permalink
Add leader election to the service-mirror controller (#11046)
Browse files Browse the repository at this point in the history
In order to support an HA mode for the service-mirror component, some
form of synchronization should be used to coordinate between replicas of
the service-mirror controller. Although in practice most of the updates
done by the replicas are idempotent (and have benign effects on
correctness), there are some downsides, such as: resource usage
implications from setting-up multiple watches, log pollution, errors
associated with writes on resources that out of date, and increased
difficulty in debugging.

This change adds coordination between the replicas through leader
election. To achieve leader election, client-go's `coordination` package
is used. The change refactors the existing code; the previous nested
loops now reside in a closure (to capture the necessary configuration),
and the closure is run when a leader is elected.

Leader election functions as part of a loop: a lease resource is created
(if it does not exist), and the controller blocks until it has acquired
the lease. The loop is terminated only on shutdown from an interrupt
signal. If the lease is lost, it is released, watchers are cleaned-up,
and the controller returns to blocking until it acquires the lease once
again.

Shutdown logic has been changed to rely on context cancellation
propagation so that the  watchers may be ended either by the leader
elector (when claim is lost) or by the main routine when an interrupt is
handled.

---------

Signed-off-by: Matei David <[email protected]>
Co-authored-by: Alejandro Pedraza <[email protected]>
  • Loading branch information
mateiidavid and alpeb authored Jun 29, 2023
1 parent 68d6789 commit 958d798
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ rules:
- apiGroups: ["multicluster.linkerd.io"]
resources: ["links"]
verbs: ["list", "get", "watch"]
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["create", "get", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
239 changes: 171 additions & 68 deletions multicluster/cmd/service-mirror/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,21 @@ import (
dynamic "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)

const linkWatchRestartAfter = 10 * time.Second
const (
linkWatchRestartAfter = 10 * time.Second
// Duration of the lease
LEASE_DURATION = 30 * time.Second
// Deadline for the leader to refresh its lease. Defaults to the same value
// used by core controllers
LEASE_RENEW_DEADLINE = 10 * time.Second
// Duration leader elector clients should wait between action re-tries.
// Defaults to the same value used by core controllers
LEASE_RETRY_PERIOD = 2 * time.Second
)

var (
clusterWatcher *servicemirror.RemoteClusterServiceWatcher
Expand Down Expand Up @@ -55,8 +67,17 @@ func Main(args []string) {
}
}()

rootCtx, cancel := context.WithCancel(context.Background())

stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
go func() {
<-stop
log.Info("Received shutdown signal")
// Cancel root context. Cancellation will be propagated to all other
// contexts that are children of the root context.
cancel()
}()

// We create two different kubernetes API clients for the local cluster:
// k8sAPI is used as a dynamic client for unstructured access to Link custom
Expand All @@ -69,10 +90,8 @@ func Main(args []string) {
if err != nil {
log.Fatalf("Failed to initialize K8s API: %s", err)
}

ctx := context.Background()
controllerK8sAPI, err := controllerK8s.InitializeAPI(
ctx,
rootCtx,
*kubeConfigPath,
false,
controllerK8s.NS,
Expand All @@ -84,79 +103,167 @@ func Main(args []string) {
}

linkClient := k8sAPI.DynamicClient.Resource(multicluster.LinkGVR).Namespace(*namespace)

metrics := servicemirror.NewProbeMetricVecs()

controllerK8sAPI.Sync(nil)

ready = true

main:
for {
// Start link watch
linkWatch, err := linkClient.Watch(ctx, metav1.ListOptions{})
if err != nil {
log.Fatalf("Failed to watch Link %s: %s", linkName, err)
}
results := linkWatch.ResultChan()

// Each time the link resource is updated, reload the config and restart the
// cluster watcher.
run := func(ctx context.Context) {
main:
for {
select {
case <-stop:
break main
case event, ok := <-results:
if !ok {
log.Info("Link watch terminated; restarting watch")
continue main
}
switch obj := event.Object.(type) {
case *dynamic.Unstructured:
if obj.GetName() == linkName {
switch event.Type {
case watch.Added, watch.Modified:
link, err := multicluster.NewLink(*obj)
if err != nil {
log.Errorf("Failed to parse link %s: %s", linkName, err)
continue
}
log.Infof("Got updated link %s: %+v", linkName, link)
creds, err := loadCredentials(ctx, link, *namespace, k8sAPI)
if err != nil {
log.Errorf("Failed to load remote cluster credentials: %s", err)
}
err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc)
if err != nil {
// failed to restart cluster watcher; give a bit of slack
// and restart the link watch to give it another try
log.Error(err)
time.Sleep(linkWatchRestartAfter)
linkWatch.Stop()
}
case watch.Deleted:
log.Infof("Link %s deleted", linkName)
if clusterWatcher != nil {
clusterWatcher.Stop(false)
clusterWatcher = nil
}
if probeWorker != nil {
probeWorker.Stop()
probeWorker = nil
// Start link watch
linkWatch, err := linkClient.Watch(ctx, metav1.ListOptions{})
if err != nil {
log.Fatalf("Failed to watch Link %s: %s", linkName, err)
}
results := linkWatch.ResultChan()

// Each time the link resource is updated, reload the config and restart the
// cluster watcher.
for {
select {
// ctx.Done() is a one-shot channel that will be closed once
// the context has been cancelled. Receiving from a closed
// channel yields the value immediately.
case <-ctx.Done():
// The channel will be closed by the leader elector when a
// lease is lost, or by a background task handling SIGTERM.
// Before terminating the loop, stop the workers and set
// them to nil to release memory.
cleanupWorkers()
return
case event, ok := <-results:
if !ok {
log.Info("Link watch terminated; restarting watch")
continue main
}
switch obj := event.Object.(type) {
case *dynamic.Unstructured:
if obj.GetName() == linkName {
switch event.Type {
case watch.Added, watch.Modified:
link, err := multicluster.NewLink(*obj)
if err != nil {
log.Errorf("Failed to parse link %s: %s", linkName, err)
continue
}
log.Infof("Got updated link %s: %+v", linkName, link)
creds, err := loadCredentials(ctx, link, *namespace, k8sAPI)
if err != nil {
log.Errorf("Failed to load remote cluster credentials: %s", err)
}
err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc)
if err != nil {
// failed to restart cluster watcher; give a bit of slack
// and restart the link watch to give it another try
log.Error(err)
time.Sleep(linkWatchRestartAfter)
linkWatch.Stop()
}
case watch.Deleted:
log.Infof("Link %s deleted", linkName)
cleanupWorkers()
default:
log.Infof("Ignoring event type %s", event.Type)
}
default:
log.Infof("Ignoring event type %s", event.Type)
}
default:
log.Errorf("Unknown object type detected: %+v", obj)
}
default:
log.Errorf("Unknown object type detected: %+v", obj)
}
}
}
}

hostname, found := os.LookupEnv("HOSTNAME")
if !found {
log.Fatal("Failed to fetch 'HOSTNAME' environment variable")
}

lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("service-mirror-write-%s", linkName),
Namespace: *namespace,
Labels: map[string]string{
"component": "linkerd-service-mirror",
"mirror.linkerd.io/cluster-name": linkName,
},
},
Client: k8sAPI.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: hostname,
},
}

election:
for {
// RunOrDie will block until the lease is lost.
//
// When a lease is acquired, the OnStartedLeading callback will be
// triggered, and a main watcher loop will be established to watch Link
// resources.
//
// When the lease is lost, all watchers will be cleaned-up and we will
// loop then attempt to re-acquire the lease.
leaderelection.RunOrDie(rootCtx, leaderelection.LeaderElectionConfig{
// When runtime context is cancelled, lock will be released. Implies any
// code guarded by the lease _must_ finish before cancelling.
ReleaseOnCancel: true,
Lock: lock,
LeaseDuration: LEASE_DURATION,
RenewDeadline: LEASE_RENEW_DEADLINE,
RetryPeriod: LEASE_RETRY_PERIOD,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// When a lease is lost, RunOrDie will cancel the context
// passed into the OnStartedLeading callback. This will in
// turn cause us to cancel the work in the run() function,
// effectively terminating and cleaning-up the watches.
log.Info("Starting controller loop")
run(ctx)
},
OnStoppedLeading: func() {
log.Infof("%s released lease", hostname)
},
OnNewLeader: func(identity string) {
if identity == hostname {
log.Infof("%s acquired lease", hostname)
}
},
},
})

select {
// If the lease has been lost, and we have received a shutdown signal,
// break the loop and gracefully exit. We can guarantee at this point
// resources have been released.
case <-rootCtx.Done():
break election
// If the lease has been lost, loop and attempt to re-acquire it.
default:

}
}
log.Info("Shutting down")
}

// cleanupWorkers is a utility function that checks whether the worker pointers
// (clusterWatcher and probeWorker) are instantiated, and if they are, stops
// their execution and sets the pointers to a nil value so that memory may be
// garbage collected.
func cleanupWorkers() {
if clusterWatcher != nil {
// release, but do not clean-up services created
// the `unlink` command will take care of that
clusterWatcher.Stop(false)
clusterWatcher = nil
}

if probeWorker != nil {
probeWorker.Stop()
probeWorker = nil
}
}

func loadCredentials(ctx context.Context, link multicluster.Link, namespace string, k8sAPI *k8s.KubernetesAPI) ([]byte, error) {
// Load the credentials secret
secret, err := k8sAPI.Interface.CoreV1().Secrets(namespace).Get(ctx, link.ClusterCredentialsSecret, metav1.GetOptions{})
Expand All @@ -177,12 +284,8 @@ func restartClusterWatcher(
metrics servicemirror.ProbeMetricVecs,
enableHeadlessSvc bool,
) error {
if clusterWatcher != nil {
clusterWatcher.Stop(false)
}
if probeWorker != nil {
probeWorker.Stop()
}

cleanupWorkers()

// Start probe worker
workerMetrics, err := metrics.NewWorkerMetrics(link.TargetClusterName)
Expand Down
3 changes: 3 additions & 0 deletions multicluster/cmd/testdata/serivce_mirror_default.golden

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion multicluster/cmd/unlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
appsv1 "k8s.io/api/apps/v1"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -67,10 +68,11 @@ func newUnlinkCommand() *cobra.Command {
roleBinding := resource.NewNamespaced(rbac.SchemeGroupVersion.String(), "RoleBinding", fmt.Sprintf("linkerd-service-mirror-read-remote-creds-%s", opts.clusterName), opts.namespace)
serviceAccount := resource.NewNamespaced(corev1.SchemeGroupVersion.String(), "ServiceAccount", fmt.Sprintf("linkerd-service-mirror-%s", opts.clusterName), opts.namespace)
serviceMirror := resource.NewNamespaced(appsv1.SchemeGroupVersion.String(), "Deployment", fmt.Sprintf("linkerd-service-mirror-%s", opts.clusterName), opts.namespace)
lease := resource.NewNamespaced(coordinationv1.SchemeGroupVersion.String(), "Lease", fmt.Sprintf("service-mirror-write-%s", opts.clusterName), opts.namespace)

resources := []resource.Kubernetes{
secret, gatewayMirror, link, clusterRole, clusterRoleBinding,
role, roleBinding, serviceAccount, serviceMirror,
role, roleBinding, serviceAccount, serviceMirror, lease,
}

selector := fmt.Sprintf("%s=%s,%s=%s",
Expand Down

0 comments on commit 958d798

Please sign in to comment.