Skip to content

Commit

Permalink
Only watch metadata for ReplicaSets in metricbeat k8s module
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm committed Oct 7, 2024
1 parent 2f252f2 commit fca4974
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added Cisco Meraki module {pull}40836[40836]
- Added Palo Alto Networks module {pull}40686[40686]
- Restore docker.network.in.* and docker.network.out.* fields in docker module {pull}40968[40968]
- Only watch metadata for ReplicaSets in metricbeat k8s module {pull}41100[41100]

*Metricbeat*

Expand Down
49 changes: 41 additions & 8 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/runtime/schema"

k8sclient "k8s.io/client-go/kubernetes"
k8sclientmeta "k8s.io/client-go/metadata"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -299,6 +302,7 @@ func createWatcher(
resource kubernetes.Resource,
options kubernetes.WatchOptions,
client k8sclient.Interface,
metadataClient k8sclientmeta.Interface,
resourceWatchers *Watchers,
namespace string,
extraWatcher bool) (bool, error) {
Expand Down Expand Up @@ -326,9 +330,27 @@ func createWatcher(
if isNamespaced(resourceName) {
options.Namespace = namespace
}
watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
var (
watcher kubernetes.Watcher
err error
)
switch resource.(type) {
// use a metadata informer for ReplicaSets, as we only need their metadata
case *kubernetes.ReplicaSet:
watcher, err = kubernetes.NewNamedMetadataWatcher(
"resource_metadata_enricher_rs",
client,
metadataClient,
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
options,
nil,
metadata.RemoveUnnecessaryReplicaSetData,
)
default:
watcher, err = kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
}
if err != nil {
return false, err
return false, fmt.Errorf("error creating watcher for %T: %w", resource, err)
}
resourceWatchers.metaWatchersMap[resourceName] = &metaWatcher{
watcher: watcher,
Expand Down Expand Up @@ -410,6 +432,7 @@ func removeFromMetricsetsUsing(resourceName string, notUsingName string, resourc
// createAllWatchers creates all the watchers required by a metricset
func createAllWatchers(
client k8sclient.Interface,
metadataClient k8sclientmeta.Interface,
metricsetName string,
resourceName string,
nodeScope bool,
Expand All @@ -429,7 +452,7 @@ func createAllWatchers(
// Create the main watcher for the given resource.
// For example pod metricset's main watcher will be pod watcher.
// If it fails, we return an error, so we can stop the extra watchers from creating.
created, err := createWatcher(resourceName, res, *options, client, resourceWatchers, config.Namespace, false)
created, err := createWatcher(resourceName, res, *options, client, metadataClient, resourceWatchers, config.Namespace, false)
if err != nil {
return fmt.Errorf("error initializing Kubernetes watcher %s, required by %s: %w", resourceName, metricsetName, err)
} else if created {
Expand All @@ -444,7 +467,7 @@ func createAllWatchers(
for _, extra := range extraWatchers {
extraRes := getResource(extra)
if extraRes != nil {
created, err = createWatcher(extra, extraRes, *options, client, resourceWatchers, config.Namespace, true)
created, err = createWatcher(extra, extraRes, *options, client, metadataClient, resourceWatchers, config.Namespace, true)
if err != nil {
log.Errorf("Error initializing Kubernetes watcher %s, required by %s: %s", extra, metricsetName, err)
} else {
Expand Down Expand Up @@ -580,18 +603,23 @@ func NewResourceMetadataEnricher(
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}
metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}

metricsetName := base.Name()
resourceName := getResourceName(metricsetName)
// Create all watchers needed for this metricset
err = createAllWatchers(client, metricsetName, resourceName, nodeScope, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetName, resourceName, nodeScope, config, log, resourceWatchers)
if err != nil {
log.Errorf("Error starting the watchers: %s", err)
return &nilEnricher{}
}

var specificMetaGen metadata.MetaGen
var generalMetaGen *metadata.Resource
var specificMetaGen metadata.MetaGen //nolint:typecheck // older versions of typecheck complain about this
var generalMetaGen *metadata.Resource //nolint:typecheck // older versions of typecheck complain about this
// We initialise the use_kubeadm variable based on modules KubeAdm base configuration
err = config.AddResourceMetadata.Namespace.SetBool("use_kubeadm", -1, commonMetaConfig.KubeAdm)
if err != nil {
Expand Down Expand Up @@ -756,10 +784,15 @@ func NewContainerMetadataEnricher(
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}
metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}

metricsetName := base.Name()

err = createAllWatchers(client, metricsetName, PodResource, nodeScope, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetName, PodResource, nodeScope, config, log, resourceWatchers)
if err != nil {
log.Errorf("Error starting the watchers: %s", err)
return &nilEnricher{}
Expand Down
27 changes: 17 additions & 10 deletions metricbeat/module/kubernetes/util/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

"github.com/stretchr/testify/require"
k8sfake "k8s.io/client-go/kubernetes/fake"
k8smetafake "k8s.io/client-go/metadata/fake"

"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -68,6 +69,7 @@ func TestCreateWatcher(t *testing.T) {
resourceWatchers := NewWatchers()

client := k8sfake.NewSimpleClientset()
metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme())
config := &kubernetesConfig{
Namespace: "test-ns",
SyncPeriod: time.Minute,
Expand All @@ -78,7 +80,7 @@ func TestCreateWatcher(t *testing.T) {
options, err := getWatchOptions(config, false, client, log)
require.NoError(t, err)

created, err := createWatcher(NamespaceResource, &kubernetes.Node{}, *options, client, resourceWatchers, config.Namespace, false)
created, err := createWatcher(NamespaceResource, &kubernetes.Node{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false)
require.True(t, created)
require.NoError(t, err)

Expand All @@ -88,7 +90,7 @@ func TestCreateWatcher(t *testing.T) {
require.NotNil(t, resourceWatchers.metaWatchersMap[NamespaceResource].watcher)
resourceWatchers.lock.Unlock()

created, err = createWatcher(NamespaceResource, &kubernetes.Namespace{}, *options, client, resourceWatchers, config.Namespace, true)
created, err = createWatcher(NamespaceResource, &kubernetes.Namespace{}, *options, client, metadataClient, resourceWatchers, config.Namespace, true)
require.False(t, created)
require.NoError(t, err)

Expand All @@ -98,7 +100,7 @@ func TestCreateWatcher(t *testing.T) {
require.NotNil(t, resourceWatchers.metaWatchersMap[NamespaceResource].watcher)
resourceWatchers.lock.Unlock()

created, err = createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, resourceWatchers, config.Namespace, false)
created, err = createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false)
require.True(t, created)
require.NoError(t, err)

Expand All @@ -113,6 +115,7 @@ func TestAddToMetricsetsUsing(t *testing.T) {
resourceWatchers := NewWatchers()

client := k8sfake.NewSimpleClientset()
metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme())
config := &kubernetesConfig{
Namespace: "test-ns",
SyncPeriod: time.Minute,
Expand All @@ -124,7 +127,7 @@ func TestAddToMetricsetsUsing(t *testing.T) {
require.NoError(t, err)

// Create the new entry with watcher and nil string array first
created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, resourceWatchers, config.Namespace, false)
created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false)
require.True(t, created)
require.NoError(t, err)

Expand All @@ -150,6 +153,7 @@ func TestRemoveFromMetricsetsUsing(t *testing.T) {
resourceWatchers := NewWatchers()

client := k8sfake.NewSimpleClientset()
metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme())
config := &kubernetesConfig{
Namespace: "test-ns",
SyncPeriod: time.Minute,
Expand All @@ -161,7 +165,7 @@ func TestRemoveFromMetricsetsUsing(t *testing.T) {
require.NoError(t, err)

// Create the new entry with watcher and nil string array first
created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, resourceWatchers, config.Namespace, false)
created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false)
require.True(t, created)
require.NoError(t, err)

Expand Down Expand Up @@ -190,6 +194,7 @@ func TestCreateAllWatchers(t *testing.T) {
resourceWatchers := NewWatchers()

client := k8sfake.NewSimpleClientset()
metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme())
config := &kubernetesConfig{
Namespace: "test-ns",
SyncPeriod: time.Minute,
Expand All @@ -202,7 +207,7 @@ func TestCreateAllWatchers(t *testing.T) {
log := logp.NewLogger("test")

// Start watchers based on a resource that does not exist should cause an error
err := createAllWatchers(client, "does-not-exist", "does-not-exist", false, config, log, resourceWatchers)
err := createAllWatchers(client, metadataClient, "does-not-exist", "does-not-exist", false, config, log, resourceWatchers)
require.Error(t, err)
resourceWatchers.lock.Lock()
require.Equal(t, 0, len(resourceWatchers.metaWatchersMap))
Expand All @@ -211,7 +216,7 @@ func TestCreateAllWatchers(t *testing.T) {
// Start watcher for a resource that requires other resources, should start all the watchers
metricsetPod := "pod"
extras := getExtraWatchers(PodResource, config.AddResourceMetadata)
err = createAllWatchers(client, metricsetPod, PodResource, false, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetPod, PodResource, false, config, log, resourceWatchers)
require.NoError(t, err)

// Check that all the required watchers are in the map
Expand Down Expand Up @@ -242,14 +247,15 @@ func TestCreateMetaGen(t *testing.T) {
},
}
client := k8sfake.NewSimpleClientset()
metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme())

_, err = createMetadataGen(client, commonConfig, config.AddResourceMetadata, DeploymentResource, resourceWatchers)
// At this point, no watchers were created
require.Error(t, err)

// Create the watchers necessary for the metadata generator
metricsetDeployment := "state_deployment"
err = createAllWatchers(client, metricsetDeployment, DeploymentResource, false, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetDeployment, DeploymentResource, false, config, log, resourceWatchers)
require.NoError(t, err)

// Create the generators, this time without error
Expand Down Expand Up @@ -282,6 +288,7 @@ func TestCreateMetaGenSpecific(t *testing.T) {
},
}
client := k8sfake.NewSimpleClientset()
metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme())

// For pod:
metricsetPod := "pod"
Expand All @@ -291,7 +298,7 @@ func TestCreateMetaGenSpecific(t *testing.T) {
require.Error(t, err)

// Create the pod resource + the extras
err = createAllWatchers(client, metricsetPod, PodResource, false, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetPod, PodResource, false, config, log, resourceWatchers)
require.NoError(t, err)

_, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, PodResource, resourceWatchers)
Expand All @@ -304,7 +311,7 @@ func TestCreateMetaGenSpecific(t *testing.T) {

// Create the service resource + the extras
metricsetService := "state_service"
err = createAllWatchers(client, metricsetService, ServiceResource, false, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetService, ServiceResource, false, config, log, resourceWatchers)
require.NoError(t, err)

_, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, ServiceResource, resourceWatchers)
Expand Down

0 comments on commit fca4974

Please sign in to comment.