Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[k8s] Fix logical race conditions in kubernetes_secrets provider #6623

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix logical race conditions in kubernetes_secrets provider

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/6623

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/6340
19 changes: 11 additions & 8 deletions internal/pkg/composable/providers/kubernetessecrets/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,23 @@ import (
"github.com/elastic/elastic-agent-autodiscover/kubernetes"
)

// Config for kubernetes provider
// Config for kubernetes_secrets provider
type Config struct {
KubeConfig string `config:"kube_config"`
KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"`

RefreshInterval time.Duration `config:"cache_refresh_interval"`
RefreshInterval time.Duration `config:"cache_refresh_interval" validate:"positive,nonzero"`
TTLDelete time.Duration `config:"cache_ttl"`
RequestTimeout time.Duration `config:"cache_request_timeout"`
RequestTimeout time.Duration `config:"cache_request_timeout" validate:"positive,nonzero"`
DisableCache bool `config:"cache_disable"`
}

func (c *Config) InitDefaults() {
c.RefreshInterval = 60 * time.Second
c.TTLDelete = 1 * time.Hour
c.RequestTimeout = 5 * time.Second
c.DisableCache = false
// defaultConfig returns default configuration for kubernetes_secrets provider
func defaultConfig() *Config {
return &Config{
RefreshInterval: 60 * time.Second,
TTLDelete: 1 * time.Hour,
RequestTimeout: 5 * time.Second,
DisableCache: false,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package kubernetessecrets

import (
"sync"
"time"
)

// expirationCache is a store that expires items after time.Now - secret.lastAccess > ttl (if ttl > 0) at Get or List.
// expirationCache works with *cacheEntry, a pointer struct that wraps secret, instead of secret directly because map
// structure in standard go library never removes the buckets from memory even after removing all the elements from it.
// However, since *cacheEntry is a pointer it can be garbage collected when no longer referenced by the GC, such as
// when deleted from the map. More importantly working with a pointer makes the entry in the map bucket, that doesn't
// get deallocated, to utilise only 8 bytes on a 64-bit system.
type expirationCache struct {
sync.Mutex
// ttl is the time-to-live for items in the cache
ttl time.Duration
// items is the underlying cache store.
items map[string]*cacheEntry
}

type cacheEntry struct {
s secret
lastAccess time.Time
}

// Get returns the secret associated with the given key from the store if it exists and is not expired. If updateAccess is true
// and the secret exists, essentially the expiration check is skipped and the lastAccess timestamp is updated to time.Now().
func (c *expirationCache) Get(key string, updateAccess bool) (secret, bool) {
c.Lock()
defer c.Unlock()

entry, exists := c.items[key]
if !exists {
return secret{}, false
}
if updateAccess {
entry.lastAccess = time.Now()
} else if c.isExpired(entry.lastAccess) {
delete(c.items, key)
return secret{}, false
}

return entry.s, true
}

// AddConditionally adds the given secret to the store if the given condition returns true. If there is no existing
// secret, the condition will be called with an empty secret and false. If updateAccess is true and the secret already exists,
// then the lastAccess timestamp is updated to time.Now() independently of the condition result.
func (c *expirationCache) AddConditionally(key string, in secret, updateAccess bool, condition conditionFn) {
c.Lock()
defer c.Unlock()
entry, exists := c.items[key]
if !exists {
if condition != nil && condition(secret{}, false) {
c.items[key] = &cacheEntry{in, time.Now()}
}
return
}

if condition != nil && condition(entry.s, true) {
entry.s = in
entry.lastAccess = time.Now()
} else if updateAccess {
entry.lastAccess = time.Now()
}
}

// isExpired returns true if the item has expired based on the ttl
func (c *expirationCache) isExpired(lastAccess time.Time) bool {
if c.ttl <= 0 {
// no expiration
return false
}
// we expire if the last access is older than the ttl
return time.Since(lastAccess) > c.ttl
}

// ListKeys returns a list of all the keys of the secrets in the store without checking for expiration
func (c *expirationCache) ListKeys() []string {
c.Lock()
defer c.Unlock()

length := len(c.items)
if length == 0 {
return nil
}
list := make([]string, 0, length)
for key := range c.items {
list = append(list, key)
}
return list
}

// List returns a list of all the secrets in the store that are not expired
func (c *expirationCache) List() []secret {
c.Lock()
defer c.Unlock()

length := len(c.items)
if length == 0 {
return nil
}
list := make([]secret, 0, length)
for _, entry := range c.items {
if c.isExpired(entry.lastAccess) {
continue
}
list = append(list, entry.s)
}
return list
}

// newExpirationCache creates and returns an expirationCache
func newExpirationCache(ttl time.Duration) *expirationCache {
return &expirationCache{
items: make(map[string]*cacheEntry),
ttl: ttl,
}
}
Loading
Loading