From 1c065263382314fe528fbf416c957e8473ca3918 Mon Sep 17 00:00:00 2001 From: haoranleo Date: Fri, 10 Jan 2025 14:17:11 -0800 Subject: [PATCH] Add rate limiter to the SA requests consumer --- pkg/cache/cache.go | 14 ++++++++++++-- pkg/cache/notifications.go | 1 + 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 5a24e0ac..5f7002b9 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -27,6 +27,7 @@ import ( "github.com/aws/amazon-eks-pod-identity-webhook/pkg" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -292,8 +293,9 @@ func New(defaultAudience, } } - // Rate limit to 10 concurrent requests against the API server. - saFetchRequests := make(chan *Request, 10) + // Allocate capacity large enough to not block writers (sync path in pod mutation). + // Rate limiting is done in the consumer side below. + saFetchRequests := make(chan *Request, 1000) c := &serviceAccountCache{ saCache: map[string]*Entry{}, cmCache: map[string]*Entry{}, @@ -307,9 +309,17 @@ func New(defaultAudience, notifications: newNotifications(saFetchRequests), } + // Rate limiting at 10 requests per second with burst to 20. + // In case the requests are queued in the channel for period longer than the service-account-lookup-grace-period, + // the pod will not be mutated if the service account is also not synced by informer cache before service-account-lookup-grace-period. + // This is to avoid adding unlimited latency to the pod mutation time. The maximum latency would be service-account-lookup-grace-period. + rl := rate.NewLimiter(rate.Every(100*time.Millisecond), 20) go func() { for req := range saFetchRequests { go func() { + // Do rate limiting inside go routine, the goal is to consume the channel as fast as possible to + // avoid writer being blocked but still rate limit the requests sent to the API server. + _ = rl.Wait(context.Background()) sa, err := fetchFromAPI(SAGetter, req) if err != nil { klog.Errorf("fetching SA: %s, but got error from API: %v", req.CacheKey(), err) diff --git a/pkg/cache/notifications.go b/pkg/cache/notifications.go index 4a628ee9..c0833db8 100644 --- a/pkg/cache/notifications.go +++ b/pkg/cache/notifications.go @@ -23,6 +23,7 @@ func (n *notifications) create(req Request) <-chan struct{} { n.mu.Lock() defer n.mu.Unlock() + // deduplicate requests to SA with same namespace/name to single request notifier, found := n.handlers[req.CacheKey()] if !found { notifier = make(chan struct{})