diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 28160619005b..018f1105cc76 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -363,6 +363,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Added Cisco Meraki module {pull}40836[40836] - Added Palo Alto Networks module {pull}40686[40686] - Restore docker.network.in.* and docker.network.out.* fields in docker module {pull}40968[40968] +- Only watch metadata for ReplicaSets in metricbeat k8s module {pull}41100[41100] *Metricbeat* diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 581c9417516d..bfd7ebf89dfb 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -24,7 +24,10 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/runtime/schema" + k8sclient "k8s.io/client-go/kubernetes" + k8sclientmeta "k8s.io/client-go/metadata" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" @@ -299,6 +302,7 @@ func createWatcher( resource kubernetes.Resource, options kubernetes.WatchOptions, client k8sclient.Interface, + metadataClient k8sclientmeta.Interface, resourceWatchers *Watchers, namespace string, extraWatcher bool) (bool, error) { @@ -326,9 +330,27 @@ func createWatcher( if isNamespaced(resourceName) { options.Namespace = namespace } - watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil) + var ( + watcher kubernetes.Watcher + err error + ) + switch resource.(type) { + // use a metadata informer for ReplicaSets, as we only need their metadata + case *kubernetes.ReplicaSet: + watcher, err = kubernetes.NewNamedMetadataWatcher( + "resource_metadata_enricher_rs", + client, + metadataClient, + schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}, + options, + nil, + metadata.RemoveUnnecessaryReplicaSetData, + ) + default: + watcher, err = kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil) + } if err != nil { - return false, err + return false, fmt.Errorf("error creating watcher for %T: %w", resource, err) } resourceWatchers.metaWatchersMap[resourceName] = &metaWatcher{ watcher: watcher, @@ -410,6 +432,7 @@ func removeFromMetricsetsUsing(resourceName string, notUsingName string, resourc // createAllWatchers creates all the watchers required by a metricset func createAllWatchers( client k8sclient.Interface, + metadataClient k8sclientmeta.Interface, metricsetName string, resourceName string, nodeScope bool, @@ -429,7 +452,7 @@ func createAllWatchers( // Create the main watcher for the given resource. // For example pod metricset's main watcher will be pod watcher. // If it fails, we return an error, so we can stop the extra watchers from creating. - created, err := createWatcher(resourceName, res, *options, client, resourceWatchers, config.Namespace, false) + created, err := createWatcher(resourceName, res, *options, client, metadataClient, resourceWatchers, config.Namespace, false) if err != nil { return fmt.Errorf("error initializing Kubernetes watcher %s, required by %s: %w", resourceName, metricsetName, err) } else if created { @@ -444,7 +467,7 @@ func createAllWatchers( for _, extra := range extraWatchers { extraRes := getResource(extra) if extraRes != nil { - created, err = createWatcher(extra, extraRes, *options, client, resourceWatchers, config.Namespace, true) + created, err = createWatcher(extra, extraRes, *options, client, metadataClient, resourceWatchers, config.Namespace, true) if err != nil { log.Errorf("Error initializing Kubernetes watcher %s, required by %s: %s", extra, metricsetName, err) } else { @@ -580,18 +603,23 @@ func NewResourceMetadataEnricher( log.Errorf("Error creating Kubernetes client: %s", err) return &nilEnricher{} } + metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions) + if err != nil { + log.Errorf("Error creating Kubernetes client: %s", err) + return &nilEnricher{} + } metricsetName := base.Name() resourceName := getResourceName(metricsetName) // Create all watchers needed for this metricset - err = createAllWatchers(client, metricsetName, resourceName, nodeScope, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetName, resourceName, nodeScope, config, log, resourceWatchers) if err != nil { log.Errorf("Error starting the watchers: %s", err) return &nilEnricher{} } - var specificMetaGen metadata.MetaGen - var generalMetaGen *metadata.Resource + var specificMetaGen metadata.MetaGen //nolint:typecheck // older versions of typecheck complain about this + var generalMetaGen *metadata.Resource //nolint:typecheck // older versions of typecheck complain about this // We initialise the use_kubeadm variable based on modules KubeAdm base configuration err = config.AddResourceMetadata.Namespace.SetBool("use_kubeadm", -1, commonMetaConfig.KubeAdm) if err != nil { @@ -756,10 +784,15 @@ func NewContainerMetadataEnricher( log.Errorf("Error creating Kubernetes client: %s", err) return &nilEnricher{} } + metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions) + if err != nil { + log.Errorf("Error creating Kubernetes client: %s", err) + return &nilEnricher{} + } metricsetName := base.Name() - err = createAllWatchers(client, metricsetName, PodResource, nodeScope, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetName, PodResource, nodeScope, config, log, resourceWatchers) if err != nil { log.Errorf("Error starting the watchers: %s", err) return &nilEnricher{} diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index 61da906372f4..30ffb45d5b03 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -38,6 +38,7 @@ import ( "github.com/stretchr/testify/require" k8sfake "k8s.io/client-go/kubernetes/fake" + k8smetafake "k8s.io/client-go/metadata/fake" "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent-libs/logp" @@ -68,6 +69,7 @@ func TestCreateWatcher(t *testing.T) { resourceWatchers := NewWatchers() client := k8sfake.NewSimpleClientset() + metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -78,7 +80,7 @@ func TestCreateWatcher(t *testing.T) { options, err := getWatchOptions(config, false, client, log) require.NoError(t, err) - created, err := createWatcher(NamespaceResource, &kubernetes.Node{}, *options, client, resourceWatchers, config.Namespace, false) + created, err := createWatcher(NamespaceResource, &kubernetes.Node{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false) require.True(t, created) require.NoError(t, err) @@ -88,7 +90,7 @@ func TestCreateWatcher(t *testing.T) { require.NotNil(t, resourceWatchers.metaWatchersMap[NamespaceResource].watcher) resourceWatchers.lock.Unlock() - created, err = createWatcher(NamespaceResource, &kubernetes.Namespace{}, *options, client, resourceWatchers, config.Namespace, true) + created, err = createWatcher(NamespaceResource, &kubernetes.Namespace{}, *options, client, metadataClient, resourceWatchers, config.Namespace, true) require.False(t, created) require.NoError(t, err) @@ -98,7 +100,7 @@ func TestCreateWatcher(t *testing.T) { require.NotNil(t, resourceWatchers.metaWatchersMap[NamespaceResource].watcher) resourceWatchers.lock.Unlock() - created, err = createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, resourceWatchers, config.Namespace, false) + created, err = createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false) require.True(t, created) require.NoError(t, err) @@ -113,6 +115,7 @@ func TestAddToMetricsetsUsing(t *testing.T) { resourceWatchers := NewWatchers() client := k8sfake.NewSimpleClientset() + metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -124,7 +127,7 @@ func TestAddToMetricsetsUsing(t *testing.T) { require.NoError(t, err) // Create the new entry with watcher and nil string array first - created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, resourceWatchers, config.Namespace, false) + created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false) require.True(t, created) require.NoError(t, err) @@ -150,6 +153,7 @@ func TestRemoveFromMetricsetsUsing(t *testing.T) { resourceWatchers := NewWatchers() client := k8sfake.NewSimpleClientset() + metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -161,7 +165,7 @@ func TestRemoveFromMetricsetsUsing(t *testing.T) { require.NoError(t, err) // Create the new entry with watcher and nil string array first - created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, resourceWatchers, config.Namespace, false) + created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false) require.True(t, created) require.NoError(t, err) @@ -190,6 +194,7 @@ func TestCreateAllWatchers(t *testing.T) { resourceWatchers := NewWatchers() client := k8sfake.NewSimpleClientset() + metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) config := &kubernetesConfig{ Namespace: "test-ns", SyncPeriod: time.Minute, @@ -202,7 +207,7 @@ func TestCreateAllWatchers(t *testing.T) { log := logp.NewLogger("test") // Start watchers based on a resource that does not exist should cause an error - err := createAllWatchers(client, "does-not-exist", "does-not-exist", false, config, log, resourceWatchers) + err := createAllWatchers(client, metadataClient, "does-not-exist", "does-not-exist", false, config, log, resourceWatchers) require.Error(t, err) resourceWatchers.lock.Lock() require.Equal(t, 0, len(resourceWatchers.metaWatchersMap)) @@ -211,7 +216,7 @@ func TestCreateAllWatchers(t *testing.T) { // Start watcher for a resource that requires other resources, should start all the watchers metricsetPod := "pod" extras := getExtraWatchers(PodResource, config.AddResourceMetadata) - err = createAllWatchers(client, metricsetPod, PodResource, false, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetPod, PodResource, false, config, log, resourceWatchers) require.NoError(t, err) // Check that all the required watchers are in the map @@ -242,6 +247,7 @@ func TestCreateMetaGen(t *testing.T) { }, } client := k8sfake.NewSimpleClientset() + metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) _, err = createMetadataGen(client, commonConfig, config.AddResourceMetadata, DeploymentResource, resourceWatchers) // At this point, no watchers were created @@ -249,7 +255,7 @@ func TestCreateMetaGen(t *testing.T) { // Create the watchers necessary for the metadata generator metricsetDeployment := "state_deployment" - err = createAllWatchers(client, metricsetDeployment, DeploymentResource, false, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetDeployment, DeploymentResource, false, config, log, resourceWatchers) require.NoError(t, err) // Create the generators, this time without error @@ -282,6 +288,7 @@ func TestCreateMetaGenSpecific(t *testing.T) { }, } client := k8sfake.NewSimpleClientset() + metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) // For pod: metricsetPod := "pod" @@ -291,7 +298,7 @@ func TestCreateMetaGenSpecific(t *testing.T) { require.Error(t, err) // Create the pod resource + the extras - err = createAllWatchers(client, metricsetPod, PodResource, false, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetPod, PodResource, false, config, log, resourceWatchers) require.NoError(t, err) _, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, PodResource, resourceWatchers) @@ -304,7 +311,7 @@ func TestCreateMetaGenSpecific(t *testing.T) { // Create the service resource + the extras metricsetService := "state_service" - err = createAllWatchers(client, metricsetService, ServiceResource, false, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetService, ServiceResource, false, config, log, resourceWatchers) require.NoError(t, err) _, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, ServiceResource, resourceWatchers)