From a584a6d93a74915ecbc283bf344b95a180ac0f5c Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Mon, 27 Jan 2025 11:58:21 +0100 Subject: [PATCH] [otel/kube-stack]: add gateway collector configuration (#6444) * feat: move telemetry aggregation and forwarding to gateway * ci: use Elastic envs in gateway * chore: add changelog entry * fix: format values file * feat: add apm loadbalancing * chore: increase resource limits * revert resource limits increase * chore: remove config warnings * docs: add Gateway collectors section * revert: enable daemonset storagechecks * rename metrics/otel pipeline and use signaltometrics * unify k8s and host metrics pipelines * use default traceID as loadbalancing routing_key * chore: reuse k8s integration test helpers * format values with Helm linter * replace loadbalancing in favor of headless otlp * Update testing/integration/otel_helm_test.go Co-authored-by: Panos Koutsovasilis * Update testing/integration/otel_helm_test.go Co-authored-by: Panos Koutsovasilis * rename k8s values options helper function * move process attributes remove processor to gateway * add batch processor for aggregation pipeline * enable compression for cluster otlp connections * chore: remove elastic endpoint references * fix: do not generate service's signals for non apm data * Revert "fix: do not generate service's signals for non apm data" This reverts commit ffa6620e1f38dd9d301be48c331d93fbe6cf26ef. * fix: set agent.name as edot-collector * fix: enable daemon hostNetwork * set unknown as default signaltometrics agent.name resource attribute * remove signaltometrics for metrics-only services --------- Co-authored-by: Panos Koutsovasilis (cherry picked from commit daed81e0771ae2a5149a4ac89abcdb497c1ee289) --- ...llector-in-Helm-kube-stack-deployment.yaml | 32 + .../helm/edot-collector/kube-stack/README.md | 15 +- .../edot-collector/kube-stack/values.yaml | 814 +++++++++--------- testing/integration/otel_helm_test.go | 154 ++-- 4 files changed, 519 insertions(+), 496 deletions(-) create mode 100644 changelog/fragments/1735306293-add-gateway-collector-in-Helm-kube-stack-deployment.yaml diff --git a/changelog/fragments/1735306293-add-gateway-collector-in-Helm-kube-stack-deployment.yaml b/changelog/fragments/1735306293-add-gateway-collector-in-Helm-kube-stack-deployment.yaml new file mode 100644 index 00000000000..eab0eb63e80 --- /dev/null +++ b/changelog/fragments/1735306293-add-gateway-collector-in-Helm-kube-stack-deployment.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: add gateway collector in Helm kube-stack deployment + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: "elastic-agent" + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/deploy/helm/edot-collector/kube-stack/README.md b/deploy/helm/edot-collector/kube-stack/README.md index b8298e8aee7..caf0463645c 100644 --- a/deploy/helm/edot-collector/kube-stack/README.md +++ b/deploy/helm/edot-collector/kube-stack/README.md @@ -17,15 +17,26 @@ The DaemonSet collectors handle the following data: - Logs: Utilizes [File Log Receiver receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/filelogreceiver#readme) to gather logs from all Pods running on the respective node. - OTLP Traces: Utilizes [OTLP Receiver]( https://github.com/open-telemetry/opentelemetry-collector/blob/main/receiver/otlpreceiver#readme) which configures both HTTP and GRPC endpoints on the node to receive OTLP trace data. -### Deployment collector +### Deployment collectors + +#### Cluster The OpenTelemetry components deployed within a Deployment collector focus on gathering data at the cluster level rather than at individual nodes. A Deployment instance of the collector operates as a standalone (unlike DaemonSet collector instances, which are deployed on every node) -The Deployment collector handles the following data: +The Cluster Deployment collector handles the following data: - Kubernetes Events: Monitors and collects events occurring across the entire Kubernetes cluster, utilizing [Kubernetes Objects Receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/k8sobjectsreceiver#readme). - Cluster Metrics: Captures metrics that provide insights into the overall health and performance of the Kubernetes cluster, utilizing [Kubernetes Cluster Receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/k8sclusterreceiver#readme). +#### Gateway + +The OpenTelemetry components deployed within the `Gateway` Deployment collectors focus on processing and exporting OTLP data to Elasticsearch. Processing components: + +- [Elastic Trace processor](https://github.com/elastic/opentelemetry-collector-components/tree/main/processor/elastictraceprocessor): The processor enriches traces with elastic specific requirements. It uses opentelemetry-lib to perform the actual enrichments. +- [Elastic Infra Metrics processor](https://github.com/elastic/opentelemetry-collector-components/tree/main/processor/elasticinframetricsprocessor): The Elastic Infra Metrics Processor is used to bridge the gap between OTEL and Elastic Infra Metrics. +- [LSM interval processor](https://github.com/elastic/opentelemetry-collector-components/tree/main/processor/lsmintervalprocessor): [Interval processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/intervalprocessor) with db-backed persistence. +- [Signal to Metrics connector](https://github.com/elastic/opentelemetry-collector-components/tree/main/connector/signaltometricsconnector): Produces metrics from all signal types (traces, logs, or metrics). + ### Auto-instrumentation The Helm Chart is configured to enable zero-code instrumentation using the [Operator's Instrumentation resource](https://github.com/open-telemetry/opentelemetry-operator/?tab=readme-ov-file#opentelemetry-auto-instrumentation-injection) for the following programming languages: diff --git a/deploy/helm/edot-collector/kube-stack/values.yaml b/deploy/helm/edot-collector/kube-stack/values.yaml index 3324d0eabc0..841efeb91a2 100644 --- a/deploy/helm/edot-collector/kube-stack/values.yaml +++ b/deploy/helm/edot-collector/kube-stack/values.yaml @@ -33,44 +33,19 @@ collectors: # Cluster is a K8s deployment EDOT collector focused on gathering telemetry # at the cluster level (Kubernetes Events and cluster metrics). cluster: - # Configure the pods resources to control CPU and memory usage. - # resources: - # limits: - # cpu: 100m - # memory: 500Mi - # requests: - # cpu: 100m - # memory: 500Mi env: - name: ELASTIC_AGENT_OTEL value: '"true"' - - name: ELASTIC_ENDPOINT - valueFrom: - secretKeyRef: - name: elastic-secret-otel - key: elastic_endpoint - - name: ELASTIC_API_KEY - valueFrom: - secretKeyRef: - name: elastic-secret-otel - key: elastic_api_key config: exporters: # [Debug exporter](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/debugexporter/README.md) debug: verbosity: basic # Options: basic, detailed. Choose verbosity level for debug logs. # [Elasticsearch exporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/elasticsearchexporter/README.md) - elasticsearch/otel: - endpoints: # List of Elasticsearch endpoints. - - ${env:ELASTIC_ENDPOINT} - api_key: ${env:ELASTIC_API_KEY} # API key for Elasticsearch authentication. - logs_dynamic_index: - enabled: true - # Enable in order to skip the SSL certificate Check - # tls: - # insecure_skip_verify: true - mapping: - mode: otel + otlp/gateway: + endpoint: "http://opentelemetry-kube-stack-gateway-collector:4317" + tls: + insecure: true processors: # [Resource Detection Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/resourcedetectionprocessor) resourcedetection/eks: @@ -184,7 +159,7 @@ collectors: metrics: exporters: - debug - - elasticsearch/otel + - otlp/gateway processors: - k8sattributes - resourcedetection/eks @@ -204,41 +179,17 @@ collectors: - resource/hostname exporters: - debug - - elasticsearch/otel + - otlp/gateway # Daemon is a K8s daemonset EDOT collector focused on gathering telemetry at # node level and exposing an OTLP endpoint for data ingestion. # Auto-instrumentation SDKs will use this endpoint. daemon: - # Configure the pods resources to control CPU and memory usage. - resources: - limits: - cpu: 1500m - memory: 1500Mi - requests: - cpu: 100m - memory: 500Mi env: # Work around for open /mounts error: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35990 - name: HOST_PROC_MOUNTINFO value: "" - name: ELASTIC_AGENT_OTEL value: '"true"' - - name: ELASTIC_ENDPOINT - valueFrom: - secretKeyRef: - name: elastic-secret-otel - key: elastic_endpoint - - name: ELASTIC_API_KEY - valueFrom: - secretKeyRef: - name: elastic-secret-otel - key: elastic_api_key - - name: GOMAXPROCS - valueFrom: - resourceFieldRef: - resource: limits.cpu - - name: GOMEMLIMIT - value: "1025MiB" presets: logsCollection: enabled: true # Enable/disable the collection of node's logs. @@ -249,299 +200,17 @@ collectors: runAsGroup: 0 scrape_configs_file: "" # [Prometheus metrics](https://github.com/open-telemetry/opentelemetry-helm-charts/tree/main/charts/opentelemetry-kube-stack#scrape_configs_file-details) config: - connectors: - # [Signal To Metrics Connector](https://github.com/elastic/opentelemetry-collector-components/tree/main/connector/signaltometricsconnector) - signaltometrics: # Produces metrics from all signal types (traces, logs, or metrics). - logs: - - name: service_summary - include_resource_attributes: - - key: service.name - - key: deployment.environment # service.environment - - key: telemetry.sdk.language # service.language.name - - key: agent.name # set via elastictraceprocessor - attributes: - - key: metricset.name - default_value: service_summary - sum: - value: "1" - datapoints: - - name: service_summary - include_resource_attributes: - - key: service.name - - key: deployment.environment # service.environment - - key: telemetry.sdk.language # service.language.name - - key: agent.name # set via elastictraceprocessor - attributes: - - key: metricset.name - default_value: service_summary - sum: - value: "1" - spans: - - name: service_summary - include_resource_attributes: - - key: service.name - - key: deployment.environment # service.environment - - key: telemetry.sdk.language # service.language.name - - key: agent.name # set via elastictraceprocessor - attributes: - - key: metricset.name - default_value: service_summary - sum: - value: Int(AdjustedCount()) - - name: transaction.duration.histogram - description: APM service transaction aggregated metrics as histogram - include_resource_attributes: - - key: service.name - - key: deployment.environment # service.environment - - key: telemetry.sdk.language # service.language.name - - key: agent.name # set via elastictraceprocessor - attributes: - - key: transaction.root - - key: transaction.type - - key: metricset.name - default_value: service_transaction - - key: elasticsearch.mapping.hints - default_value: [_doc_count] - unit: us - exponential_histogram: - value: Microseconds(end_time - start_time) - - name: transaction.duration.summary - description: APM service transaction aggregated metrics as summary - include_resource_attributes: - - key: service.name - - key: deployment.environment # service.environment - - key: telemetry.sdk.language # service.language.name - - key: agent.name # set via elastictraceprocessor - attributes: - - key: transaction.root - - key: transaction.type - - key: metricset.name - default_value: service_transaction - - key: elasticsearch.mapping.hints - default_value: [aggregate_metric_double] - unit: us - histogram: - buckets: [1] - value: Microseconds(end_time - start_time) - - name: transaction.duration.histogram - description: APM transaction aggregated metrics as histogram - ephemeral_resource_attribute: true - include_resource_attributes: - - key: service.name - - key: deployment.environment # service.environment - - key: telemetry.sdk.language # service.language.name - - key: agent.name # set via elastictraceprocessor - - key: container.id - - key: k8s.pod.name - - key: service.version - - key: service.instance.id # service.node.name - - key: process.runtime.name # service.runtime.name - - key: process.runtime.version # service.runtime.version - - key: telemetry.sdk.version # service.language.version?? - - key: host.name - - key: os.type # host.os.platform - - key: faas.instance - - key: faas.name - - key: faas.version - - key: cloud.provider - - key: cloud.region - - key: cloud.availability_zone - - key: cloud.platform # cloud.servicename - - key: cloud.account.id - attributes: - - key: transaction.root - - key: transaction.name - - key: transaction.type - - key: transaction.result - - key: event.outcome - - key: metricset.name - default_value: transaction - - key: elasticsearch.mapping.hints - default_value: [_doc_count] - unit: us - exponential_histogram: - value: Microseconds(end_time - start_time) - - name: transaction.duration.summary - description: APM transaction aggregated metrics as summary - ephemeral_resource_attribute: true - include_resource_attributes: - - key: service.name - - key: deployment.environment # service.environment - - key: telemetry.sdk.language # service.language.name - - key: agent.name # set via elastictraceprocessor - - key: container.id - - key: k8s.pod.name - - key: service.version - - key: service.instance.id # service.node.name - - key: process.runtime.name # service.runtime.name - - key: process.runtime.version # service.runtime.version - - key: telemetry.sdk.version # service.language.version?? - - key: host.name - - key: os.type # host.os.platform - - key: faas.instance - - key: faas.name - - key: faas.version - - key: cloud.provider - - key: cloud.region - - key: cloud.availability_zone - - key: cloud.platform # cloud.servicename - - key: cloud.account.id - attributes: - - key: transaction.root - - key: transaction.name - - key: transaction.type - - key: transaction.result - - key: event.outcome - - key: metricset.name - default_value: transaction - - key: elasticsearch.mapping.hints - default_value: [aggregate_metric_double] - unit: us - histogram: - buckets: [1] - value: Microseconds(end_time - start_time) - - name: span.destination.service.response_time.sum.us - description: APM span destination metrics - ephemeral_resource_attribute: true - include_resource_attributes: - - key: service.name - - key: deployment.environment # service.environment - - key: telemetry.sdk.language # service.language.name - - key: agent.name # set via elastictraceprocessor - attributes: - - key: span.name - - key: event.outcome - - key: service.target.type - - key: service.target.name - - key: span.destination.service.resource - - key: metricset.name - default_value: service_destination - unit: us - sum: - value: Double(Microseconds(end_time - start_time)) - - name: span.destination.service.response_time.count - description: APM span destination metrics - ephemeral_resource_attribute: true - include_resource_attributes: - - key: service.name - - key: deployment.environment # service.environment - - key: telemetry.sdk.language # service.language.name - - key: agent.name # set via elastictraceprocessor - attributes: - - key: span.name - - key: event.outcome - - key: service.target.type - - key: service.target.name - - key: span.destination.service.resource - - key: metricset.name - default_value: service_destination - sum: - value: Int(AdjustedCount()) - # event.success_count is populated using 2 metric definition with different conditions - # and value for the histogram bucket based on event outcome. Both metric definition - # are created using same name and attribute and will result in a single histogram. - # We use mapping hint of aggregate_metric_double, so, only the sum and the count - # values are required and the actual histogram bucket is ignored. - - name: event.success_count - description: Success count as a metric for service transaction - include_resource_attributes: - - key: service.name - - key: deployment.environment # service.environment - - key: telemetry.sdk.language # service.language.name - - key: agent.name # set via elastictraceprocessor - attributes: - - key: transaction.root - - key: transaction.type - - key: metricset.name - default_value: service_transaction - - key: elasticsearch.mapping.hints - default_value: [aggregate_metric_double] - conditions: - - attributes["event.outcome"] != nil and attributes["event.outcome"] == "success" - unit: us - histogram: - buckets: [1] - count: Int(AdjustedCount()) - value: Int(AdjustedCount()) - - name: event.success_count - description: Success count as a metric for service transaction - include_resource_attributes: - - key: service.name - - key: deployment.environment # service.environment - - key: telemetry.sdk.language # service.language.name - - key: agent.name # set via elastictraceprocessor - attributes: - - key: transaction.root - - key: transaction.type - - key: metricset.name - default_value: service_transaction - - key: elasticsearch.mapping.hints - default_value: [aggregate_metric_double] - conditions: - - attributes["event.outcome"] != nil and attributes["event.outcome"] != "success" - unit: us - histogram: - buckets: [0] - count: Int(AdjustedCount()) - value: Double(0) exporters: # [Debug exporter](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/debugexporter/README.md) debug: verbosity: basic - # [Elasticsearch exporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/elasticsearchexporter/README.md) - elasticsearch/otel: - endpoints: - - ${env:ELASTIC_ENDPOINT} - api_key: ${env:ELASTIC_API_KEY} - metrics_dynamic_index: - enabled: true - logs_dynamic_index: - enabled: true - traces_dynamic_index: - enabled: true - flush: - interval: 10s - # tls: - # insecure_skip_verify: true - mapping: - mode: otel - # [Elasticsearch exporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/elasticsearchexporter/README.md) - elasticsearch/ecs: - endpoints: - - ${env:ELASTIC_ENDPOINT} - api_key: ${env:ELASTIC_API_KEY} - # tls: - # insecure_skip_verify: true - mapping: - mode: ecs + otlp/gateway: + endpoint: "http://opentelemetry-kube-stack-gateway-collector-headless:4317" + tls: + insecure: true processors: # [Batch Processor](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/batchprocessor) batch: {} - # [Elastic Trace Processor](https://github.com/elastic/opentelemetry-collector-components/tree/main/processor/elastictraceprocessor) - elastictrace: {} # The processor enriches traces with elastic specific requirements. - # [LSM Interval Processor](https://github.com/elastic/opentelemetry-collector-components/tree/main/processor/lsmintervalprocessor) - lsminterval: - intervals: - - duration: 1m - statements: - - set(resource.attributes["metricset.interval"], "1m") - - set(attributes["data_stream.dataset"], Concat([attributes["metricset.name"], "1m"], ".")) - - set(attributes["processor.event"], "metric") - - duration: 10m - statements: - - set(resource.attributes["metricset.interval"], "10m") - - set(attributes["data_stream.dataset"], Concat([attributes["metricset.name"], "10m"], ".")) - - set(attributes["processor.event"], "metric") - - duration: 60m - statements: - - set(resource.attributes["metricset.interval"], "60m") - - set(attributes["data_stream.dataset"], Concat([attributes["metricset.name"], "60m"], ".")) - - set(attributes["processor.event"], "metric") - # [Elastic Infra Metrics Processor](https://github.com/elastic/opentelemetry-collector-components/tree/main/processor/elasticinframetricsprocessor) - elasticinframetrics: - add_system_metrics: true - add_k8s_metrics: true - drop_original: true # [Resource Detection Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/resourcedetectionprocessor) resourcedetection/eks: detectors: [env, eks] # Detects resources from environment variables and EKS (Elastic Kubernetes Service). @@ -626,18 +295,6 @@ collectors: - key: cloud.instance.id from_attribute: host.id action: insert - resource/process: - attributes: - - key: process.executable.name - action: delete - - key: process.executable.path - action: delete - # [Attributes Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/attributesprocessor) - attributes/dataset: - actions: - - key: event.dataset - from_attribute: data_stream.dataset - action: upsert # [K8s Attributes Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/k8sattributesprocessor) k8sattributes: filter: @@ -675,28 +332,6 @@ collectors: - tag_name: app.label.version key: app.kubernetes.io/version from: pod - k8sattributes/ecs: - filter: - # Only retrieve pods running on the same node as the collector - node_from_env_var: OTEL_K8S_NODE_NAME - passthrough: false - pod_association: - # Below association takes a look at the k8s.pod.ip and k8s.pod.uid resource attributes or connection's context, and tries to match it with the pod having the same attribute. - - sources: - - from: resource_attribute - name: k8s.pod.ip - - sources: - - from: resource_attribute - name: k8s.pod.uid - - sources: - - from: connection - extract: - metadata: - - "k8s.replicaset.name" - - "k8s.statefulset.name" - - "k8s.daemonset.name" - - "k8s.cronjob.name" - - "k8s.job.name" receivers: # [OTLP Receiver](https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver/otlpreceiver) otlp: @@ -712,7 +347,7 @@ collectors: start_at: end exclude: # exlude collector logs - - /var/log/pods/opentelemetry-operator-system_opentelemetry-kube-stack*/*/*.log + - /var/log/pods/*opentelemetry-kube-stack*/*/*.log include: - /var/log/pods/*/*/*.log include_file_name: false @@ -841,32 +476,14 @@ collectors: - resource/hostname - resource/cloud exporters: - - debug - - elasticsearch/otel + - otlp/gateway metrics/node/otel: receivers: - kubeletstats - processors: - - batch - - k8sattributes - - resourcedetection/system - - resourcedetection/eks - - resourcedetection/gcp - - resourcedetection/aks - - resource/k8s - - resource/hostname - - resource/cloud - exporters: - - debug - - elasticsearch/otel - metrics/node/ecs: - receivers: - hostmetrics - - kubeletstats processors: - - elasticinframetrics - batch - - k8sattributes/ecs + - k8sattributes - resourcedetection/system - resourcedetection/eks - resourcedetection/gcp @@ -874,11 +491,9 @@ collectors: - resource/k8s - resource/hostname - resource/cloud - - attributes/dataset - - resource/process exporters: - - debug - - elasticsearch/ecs + # - debug + - otlp/gateway metrics/otel-apm: receivers: - otlp @@ -886,9 +501,7 @@ collectors: - batch - resource/hostname exporters: - - debug - - signaltometrics - - elasticsearch/otel + - otlp/gateway logs/apm: receivers: - otlp @@ -896,25 +509,404 @@ collectors: - batch - resource/hostname exporters: - - debug - - signaltometrics - - elasticsearch/otel + - otlp/gateway traces/apm: receivers: - otlp processors: - batch - - elastictrace - resource/hostname exporters: - - debug - - signaltometrics - - elasticsearch/otel + - otlp/gateway + # Gateway is a K8s deployment EDOT collector focused on processing and + # forwarding telemetry to an Elasticsearch endpoint. + gateway: + resources: + limits: + cpu: 1500m + memory: 1500Mi + requests: + cpu: 100m + memory: 500Mi + suffix: gateway + replicas: 2 + enabled: true + env: + - name: ELASTIC_AGENT_OTEL + value: '"true"' + - name: ELASTIC_ENDPOINT + valueFrom: + secretKeyRef: + name: elastic-secret-otel + key: elastic_endpoint + - name: ELASTIC_API_KEY + valueFrom: + secretKeyRef: + name: elastic-secret-otel + key: elastic_api_key + - name: GOMAXPROCS + valueFrom: + resourceFieldRef: + resource: limits.cpu + - name: GOMEMLIMIT + value: "1025MiB" + config: + connectors: + routing: + default_pipelines: [metrics/otel] + error_mode: ignore + match_once: true + table: + - context: metric + statement: route() where instrumentation_scope.name == "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver" or IsMatch(instrumentation_scope.name, "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/*") + pipelines: [metrics/infra/ecs, metrics/otel] + # [Signal To Metrics Connector](https://github.com/elastic/opentelemetry-collector-components/tree/main/connector/signaltometricsconnector) + signaltometrics: # Produces metrics from all signal types (traces, logs, or metrics). + logs: + - name: service_summary + include_resource_attributes: + - key: service.name + - key: deployment.environment # service.environment + - key: telemetry.sdk.language # service.language.name + - key: agent.name + default_value: unknown + attributes: + - key: metricset.name + default_value: service_summary + sum: + value: "1" + datapoints: + - name: service_summary + include_resource_attributes: + - key: service.name + - key: deployment.environment # service.environment + - key: telemetry.sdk.language # service.language.name + - key: agent.name + default_value: unknown + attributes: + - key: metricset.name + default_value: service_summary + sum: + value: "1" + spans: + - name: service_summary + include_resource_attributes: + - key: service.name + - key: deployment.environment # service.environment + - key: telemetry.sdk.language # service.language.name + - key: agent.name + default_value: unknown + attributes: + - key: metricset.name + default_value: service_summary + sum: + value: Int(AdjustedCount()) + - name: transaction.duration.histogram + description: APM service transaction aggregated metrics as histogram + include_resource_attributes: + - key: service.name + - key: deployment.environment # service.environment + - key: telemetry.sdk.language # service.language.name + - key: agent.name + default_value: unknown + attributes: + - key: transaction.root + - key: transaction.type + - key: metricset.name + default_value: service_transaction + - key: elasticsearch.mapping.hints + default_value: [_doc_count] + unit: us + exponential_histogram: + value: Microseconds(end_time - start_time) + - name: transaction.duration.summary + description: APM service transaction aggregated metrics as summary + include_resource_attributes: + - key: service.name + - key: deployment.environment # service.environment + - key: telemetry.sdk.language # service.language.name + - key: agent.name + default_value: unknown + attributes: + - key: transaction.root + - key: transaction.type + - key: metricset.name + default_value: service_transaction + - key: elasticsearch.mapping.hints + default_value: [aggregate_metric_double] + unit: us + histogram: + buckets: [1] + value: Microseconds(end_time - start_time) + - name: transaction.duration.histogram + description: APM transaction aggregated metrics as histogram + ephemeral_resource_attribute: true + include_resource_attributes: + - key: service.name + - key: deployment.environment # service.environment + - key: telemetry.sdk.language # service.language.name + - key: agent.name + default_value: unknown + - key: container.id + - key: k8s.pod.name + - key: service.version + - key: service.instance.id # service.node.name + - key: process.runtime.name # service.runtime.name + - key: process.runtime.version # service.runtime.version + - key: telemetry.sdk.version # service.language.version?? + - key: host.name + - key: os.type # host.os.platform + - key: faas.instance + - key: faas.name + - key: faas.version + - key: cloud.provider + - key: cloud.region + - key: cloud.availability_zone + - key: cloud.platform # cloud.servicename + - key: cloud.account.id + attributes: + - key: transaction.root + - key: transaction.name + - key: transaction.type + - key: transaction.result + - key: event.outcome + - key: metricset.name + default_value: transaction + - key: elasticsearch.mapping.hints + default_value: [_doc_count] + unit: us + exponential_histogram: + value: Microseconds(end_time - start_time) + - name: transaction.duration.summary + description: APM transaction aggregated metrics as summary + ephemeral_resource_attribute: true + include_resource_attributes: + - key: service.name + - key: deployment.environment # service.environment + - key: telemetry.sdk.language # service.language.name + - key: agent.name + default_value: unknown + - key: container.id + - key: k8s.pod.name + - key: service.version + - key: service.instance.id # service.node.name + - key: process.runtime.name # service.runtime.name + - key: process.runtime.version # service.runtime.version + - key: telemetry.sdk.version # service.language.version?? + - key: host.name + - key: os.type # host.os.platform + - key: faas.instance + - key: faas.name + - key: faas.version + - key: cloud.provider + - key: cloud.region + - key: cloud.availability_zone + - key: cloud.platform # cloud.servicename + - key: cloud.account.id + attributes: + - key: transaction.root + - key: transaction.name + - key: transaction.type + - key: transaction.result + - key: event.outcome + - key: metricset.name + default_value: transaction + - key: elasticsearch.mapping.hints + default_value: [aggregate_metric_double] + unit: us + histogram: + buckets: [1] + value: Microseconds(end_time - start_time) + - name: span.destination.service.response_time.sum.us + description: APM span destination metrics + ephemeral_resource_attribute: true + include_resource_attributes: + - key: service.name + - key: deployment.environment # service.environment + - key: telemetry.sdk.language # service.language.name + - key: agent.name + default_value: unknown + attributes: + - key: span.name + - key: event.outcome + - key: service.target.type + - key: service.target.name + - key: span.destination.service.resource + - key: metricset.name + default_value: service_destination + unit: us + sum: + value: Double(Microseconds(end_time - start_time)) + - name: span.destination.service.response_time.count + description: APM span destination metrics + ephemeral_resource_attribute: true + include_resource_attributes: + - key: service.name + - key: deployment.environment # service.environment + - key: telemetry.sdk.language # service.language.name + - key: agent.name + default_value: unknown + attributes: + - key: span.name + - key: event.outcome + - key: service.target.type + - key: service.target.name + - key: span.destination.service.resource + - key: metricset.name + default_value: service_destination + sum: + value: Int(AdjustedCount()) + # event.success_count is populated using 2 metric definition with different conditions + # and value for the histogram bucket based on event outcome. Both metric definition + # are created using same name and attribute and will result in a single histogram. + # We use mapping hint of aggregate_metric_double, so, only the sum and the count + # values are required and the actual histogram bucket is ignored. + - name: event.success_count + description: Success count as a metric for service transaction + include_resource_attributes: + - key: service.name + - key: deployment.environment # service.environment + - key: telemetry.sdk.language # service.language.name + - key: agent.name + default_value: unknown + attributes: + - key: transaction.root + - key: transaction.type + - key: metricset.name + default_value: service_transaction + - key: elasticsearch.mapping.hints + default_value: [aggregate_metric_double] + conditions: + - attributes["event.outcome"] != nil and attributes["event.outcome"] == "success" + unit: us + histogram: + buckets: [1] + count: Int(AdjustedCount()) + value: Int(AdjustedCount()) + - name: event.success_count + description: Success count as a metric for service transaction + include_resource_attributes: + - key: service.name + - key: deployment.environment # service.environment + - key: telemetry.sdk.language # service.language.name + - key: agent.name + default_value: unknown + attributes: + - key: transaction.root + - key: transaction.type + - key: metricset.name + default_value: service_transaction + - key: elasticsearch.mapping.hints + default_value: [aggregate_metric_double] + conditions: + - attributes["event.outcome"] != nil and attributes["event.outcome"] != "success" + unit: us + histogram: + buckets: [0] + count: Int(AdjustedCount()) + value: Double(0) + receivers: + otlp: + protocols: + grpc: + endpoint: ${env:MY_POD_IP}:4317 + http: + endpoint: ${env:MY_POD_IP}:4318 + processors: + # [Elastic Infra Metrics Processor](https://github.com/elastic/opentelemetry-collector-components/tree/main/processor/elasticinframetricsprocessor) + elasticinframetrics: + add_system_metrics: true + add_k8s_metrics: true + drop_original: true + # [Attributes Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/attributesprocessor) + attributes/dataset: + actions: + - key: event.dataset + from_attribute: data_stream.dataset + action: upsert + resource/process: + attributes: + - key: process.executable.name + action: delete + - key: process.executable.path + action: delete + batch: + send_batch_size: 1000 + timeout: 1s + send_batch_max_size: 1500 + batch/aggs: + send_batch_size: 16384 # 2x the default + timeout: 10s + # [Elastic Trace Processor](https://github.com/elastic/opentelemetry-collector-components/tree/main/processor/elastictraceprocessor) + elastictrace: {} # The processor enriches traces with elastic specific requirements. + # [LSM Interval Processor](https://github.com/elastic/opentelemetry-collector-components/tree/main/processor/lsmintervalprocessor) + lsminterval: + intervals: + - duration: 1m + statements: + - set(resource.attributes["metricset.interval"], "1m") + - set(attributes["data_stream.dataset"], Concat([attributes["metricset.name"], "1m"], ".")) + - set(attributes["processor.event"], "metric") + - duration: 10m + statements: + - set(resource.attributes["metricset.interval"], "10m") + - set(attributes["data_stream.dataset"], Concat([attributes["metricset.name"], "10m"], ".")) + - set(attributes["processor.event"], "metric") + - duration: 60m + statements: + - set(resource.attributes["metricset.interval"], "60m") + - set(attributes["data_stream.dataset"], Concat([attributes["metricset.name"], "60m"], ".")) + - set(attributes["processor.event"], "metric") + exporters: + debug: {} + # [Elasticsearch exporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/elasticsearchexporter/README.md) + elasticsearch/otel: + endpoints: # List of Elasticsearch endpoints. + - ${env:ELASTIC_ENDPOINT} + api_key: ${env:ELASTIC_API_KEY} # API key for Elasticsearch authentication. + logs_dynamic_index: + enabled: true + # Enable in order to skip the SSL certificate Check + # tls: + # insecure_skip_verify: true + mapping: + mode: otel + elasticsearch/ecs: + endpoints: + - ${env:ELASTIC_ENDPOINT} + api_key: ${env:ELASTIC_API_KEY} + mapping: + mode: ecs + service: + pipelines: + metrics: + receivers: [otlp] + exporters: [routing] + metrics/infra/ecs: + receivers: [routing] + processors: + - elasticinframetrics + - attributes/dataset + - resource/process + - batch + exporters: [debug, elasticsearch/ecs] + metrics/otel: + receivers: [routing] + processors: [batch] + exporters: [debug, elasticsearch/otel] + logs: + receivers: [otlp] + processors: [batch] + exporters: [debug, signaltometrics, elasticsearch/otel] + traces: + receivers: [otlp] + processors: [batch, elastictrace] + exporters: [debug, signaltometrics, elasticsearch/otel] metrics/aggregated-otel-metrics: receivers: - signaltometrics processors: - - batch + - batch/aggs - lsminterval exporters: - debug diff --git a/testing/integration/otel_helm_test.go b/testing/integration/otel_helm_test.go index fbec0b19c27..f16664236a8 100644 --- a/testing/integration/otel_helm_test.go +++ b/testing/integration/otel_helm_test.go @@ -9,23 +9,22 @@ package integration import ( "context" "fmt" - "strings" "testing" "time" "github.com/stretchr/testify/require" "helm.sh/helm/v3/pkg/action" - "helm.sh/helm/v3/pkg/chart/loader" "helm.sh/helm/v3/pkg/cli" "helm.sh/helm/v3/pkg/cli/values" "helm.sh/helm/v3/pkg/getter" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/elastic/elastic-agent/pkg/testing/define" ) var ( - kubeStackChartVersion = "0.3.2" + kubeStackChartVersion = "0.3.9" kubeStackChartURL = "https://github.com/open-telemetry/opentelemetry-helm-charts/releases/download/opentelemetry-kube-stack-" + kubeStackChartVersion + "/opentelemetry-kube-stack-" + kubeStackChartVersion + ".tgz" ) @@ -55,22 +54,36 @@ func TestOtelKubeStackHelm(t *testing.T) { } testCases := []struct { - name string - helmReleaseName string - valuesFile string - atLeastValidatedPodsNumber int + name string + steps []k8sTestStep }{ { - name: "helm standalone agent default kubernetes privileged", - helmReleaseName: "kube-stack-otel", - valuesFile: "../../deploy/helm/edot-collector/kube-stack/values.yaml", - // - A Daemonset to collect K8s node's metrics and logs - // (1 EDOT collector pod per node) - // - A Cluster wide Deployment to collect K8s metrics and - // events (1 EDOT collector pod per cluster) - // - An OpenTelemetry Operator Deployment (1 pod per - // cluster) - atLeastValidatedPodsNumber: 3, + name: "helm kube-stack operator standalone agent kubernetes privileged", + steps: []k8sTestStep{ + k8sStepCreateNamespace(), + k8sStepHelmDeployWithValueOptions(chartLocation, "kube-stack-otel", + values.Options{ + ValueFiles: []string{"../../deploy/helm/edot-collector/kube-stack/values.yaml"}, + Values: []string{fmt.Sprintf("defaultCRConfig.image.repository=%s", kCtx.agentImageRepo), fmt.Sprintf("defaultCRConfig.image.tag=%s", kCtx.agentImageTag)}, + + // override secrets reference with env variables + JSONValues: []string{ + fmt.Sprintf(`collectors.gateway.env[1]={"name":"ELASTIC_ENDPOINT","value":"%s"}`, kCtx.esHost), + fmt.Sprintf(`collectors.gateway.env[2]={"name":"ELASTIC_API_KEY","value":"%s"}`, kCtx.esAPIKey), + }, + }, + ), + // - An OpenTelemetry Operator Deployment (1 pod per + // cluster) + k8sStepCheckRunningPods("app.kubernetes.io/name=opentelemetry-operator", 1, "manager"), + // - A Daemonset to collect K8s node's metrics and logs + // (1 EDOT collector pod per node) + // - A Cluster wide Deployment to collect K8s metrics and + // events (1 EDOT collector pod per cluster) + // - Two Gateway pods to collect, aggregate and forward + // telemetry. + k8sStepCheckRunningPods("app.kubernetes.io/managed-by=opentelemetry-operator", 4, "otc-container"), + }, }, } @@ -79,82 +92,57 @@ func TestOtelKubeStackHelm(t *testing.T) { ctx := context.Background() testNamespace := kCtx.getNamespace(t) - settings := cli.New() - settings.SetNamespace(testNamespace) - actionConfig := &action.Configuration{} - - helmChart, err := loader.Load(chartLocation) - require.NoError(t, err, "failed to load helm chart") + for _, step := range tc.steps { + step(t, ctx, kCtx, testNamespace) + } + }) + } +} - err = actionConfig.Init(settings.RESTClientGetter(), settings.Namespace(), "", - func(format string, v ...interface{}) {}) - require.NoError(t, err, "failed to init helm action config") +func k8sStepHelmDeployWithValueOptions(chartPath string, releaseName string, values values.Options) k8sTestStep { + return func(t *testing.T, ctx context.Context, kCtx k8sContext, namespace string) { + // Initialize a map to hold the parsed data + helmValues := make(map[string]any) - // Initialize a map to hold the parsed data - helmValues := make(map[string]any) + settings := cli.New() + settings.SetNamespace(namespace) + providers := getter.All(settings) + helmValues, err := values.MergeValues(providers) + if err != nil { + require.NoError(t, err, "failed to helm values") + } - options := values.Options{ - ValueFiles: []string{tc.valuesFile}, - Values: []string{fmt.Sprintf("defaultCRConfig.image.repository=%s", kCtx.agentImageRepo), fmt.Sprintf("defaultCRConfig.image.tag=%s", kCtx.agentImageTag)}, + k8sStepHelmDeploy(chartPath, releaseName, helmValues)(t, ctx, kCtx, namespace) + } +} - // override secrets reference with env variables - JSONValues: []string{ - fmt.Sprintf(`collectors.cluster.env[1]={"name":"ELASTIC_ENDPOINT","value":"%s"}`, kCtx.esHost), - fmt.Sprintf(`collectors.cluster.env[2]={"name":"ELASTIC_API_KEY","value":"%s"}`, kCtx.esAPIKey), - fmt.Sprintf(`collectors.daemon.env[2]={"name":"ELASTIC_ENDPOINT","value":"%s"}`, kCtx.esHost), - fmt.Sprintf(`collectors.daemon.env[3]={"name":"ELASTIC_API_KEY","value":"%s"}`, kCtx.esAPIKey), - }, - } - providers := getter.All(settings) - helmValues, err = options.MergeValues(providers) - if err != nil { - require.NoError(t, err, "failed to helm values") - } +// k8sStepCheckRunningPods checks the status of the agent inside the pods returned by the selector +func k8sStepCheckRunningPods(podLabelSelector string, expectedPodNumber int, containerName string) k8sTestStep { + return func(t *testing.T, ctx context.Context, kCtx k8sContext, namespace string) { + require.Eventually(t, func() bool { + perNodePodList := &corev1.PodList{} + err := kCtx.client.Resources(namespace).List(ctx, perNodePodList, func(opt *metav1.ListOptions) { + opt.LabelSelector = podLabelSelector + }) + require.NoError(t, err, "failed to list pods with selector ", perNodePodList) + checkedAgentContainers := 0 - t.Cleanup(func() { - if t.Failed() { - if err := k8sDumpAllPodLogs(ctx, kCtx.client, testNamespace, testNamespace, kCtx.logsBasePath); err != nil { - t.Logf("failed to dump logs: %s", err) - } + for _, pod := range perNodePodList.Items { + if pod.Status.Phase != corev1.PodRunning { + continue } - uninstallAction := action.NewUninstall(actionConfig) - uninstallAction.Wait = true - - _, err = uninstallAction.Run(tc.helmReleaseName) - if err != nil { - require.NoError(t, err, "failed to uninstall helm chart") - } - }) + for _, container := range pod.Status.ContainerStatuses { + if container.Name != containerName { + continue + } - installAction := action.NewInstall(actionConfig) - installAction.Namespace = testNamespace - installAction.CreateNamespace = true - installAction.UseReleaseName = true - installAction.ReleaseName = tc.helmReleaseName - installAction.Timeout = 2 * time.Minute - installAction.Wait = true - installAction.WaitForJobs = true - _, err = installAction.Run(helmChart, helmValues) - require.NoError(t, err, "failed to install helm chart") - - // Pods are created by the OpenTelemetry Operator, it - // takes some time for the OpenTelemetry Operator to be - // ready - require.Eventually(t, func() bool { - podList := &corev1.PodList{} - err = kCtx.client.Resources(testNamespace).List(ctx, podList) - require.NoError(t, err, fmt.Sprintf("failed to list pods in namespace %s", testNamespace)) - - checkedAgentContainers := 0 - - for _, pod := range podList.Items { - if strings.HasPrefix(pod.GetName(), tc.helmReleaseName) && pod.Status.Phase == corev1.PodRunning { + if container.RestartCount == 0 && container.State.Running != nil { checkedAgentContainers++ } } - return checkedAgentContainers >= tc.atLeastValidatedPodsNumber - }, 5*time.Minute, 10*time.Second, fmt.Sprintf("at least %d agent containers should be checked", tc.atLeastValidatedPodsNumber)) - }) + } + return checkedAgentContainers >= expectedPodNumber + }, 5*time.Minute, 10*time.Second, fmt.Sprintf("at least %d agent containers should be checked", expectedPodNumber)) } }