From c1d42aef0869d194484d9113dcbac2a4d8fec0bf Mon Sep 17 00:00:00 2001 From: Roger Coll Date: Fri, 21 Feb 2025 17:20:26 +0100 Subject: [PATCH 1/6] [Managed OTLP] Add Host/K8s onboarding configurations (#6630) * feat: remove apm-processing from gateway collector * remove elasticsearch exporter in favor of managed otlp * update darwin configurations * refactor: move motel values to motel dir * add ApiKey prefix for motel exporter headers * refactor: move motel configurations to motel subdir * test: add motel configuration integration test case * chore: add fragments entry * refactor: use managed_otlp directory * fix: mock a valid ingest endpoint url * chore: remove MOtel in favor of Managed OTLP Input * increase gateway resource limits * use ELASTIC_OTLP_ENDPOINT as secret and env variable * never split metrics requests in batchprocessor * update env variable on host configuration * gateway: use autoscaler instead of fixed replicas --- ...44282-add-MOtel-sample-configurations.yaml | 32 + .../kube-stack/managed_otlp/values.yaml | 615 ++++++++++++++++++ .../managed_otlp/logs_metrics_traces.yml | 112 ++++ .../darwin/managed_otlp/platformlogs.yml | 67 ++ .../managed_otlp/platformlogs_hostmetrics.yml | 91 +++ .../managed_otlp/logs_metrics_traces.yml | 119 ++++ .../linux/managed_otlp/platformlogs.yml | 67 ++ .../managed_otlp/platformlogs_hostmetrics.yml | 98 +++ magefile.go | 13 +- testing/integration/otel_helm_test.go | 31 +- 10 files changed, 1240 insertions(+), 5 deletions(-) create mode 100644 changelog/fragments/1739544282-add-MOtel-sample-configurations.yaml create mode 100644 deploy/helm/edot-collector/kube-stack/managed_otlp/values.yaml create mode 100644 internal/pkg/otel/samples/darwin/managed_otlp/logs_metrics_traces.yml create mode 100644 internal/pkg/otel/samples/darwin/managed_otlp/platformlogs.yml create mode 100644 internal/pkg/otel/samples/darwin/managed_otlp/platformlogs_hostmetrics.yml create mode 100644 internal/pkg/otel/samples/linux/managed_otlp/logs_metrics_traces.yml create mode 100644 internal/pkg/otel/samples/linux/managed_otlp/platformlogs.yml create mode 100644 internal/pkg/otel/samples/linux/managed_otlp/platformlogs_hostmetrics.yml diff --git a/changelog/fragments/1739544282-add-MOtel-sample-configurations.yaml b/changelog/fragments/1739544282-add-MOtel-sample-configurations.yaml new file mode 100644 index 00000000000..94583c945f3 --- /dev/null +++ b/changelog/fragments/1739544282-add-MOtel-sample-configurations.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: add MOtel sample configurations + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: "elastic-agent" + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/6630 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/deploy/helm/edot-collector/kube-stack/managed_otlp/values.yaml b/deploy/helm/edot-collector/kube-stack/managed_otlp/values.yaml new file mode 100644 index 00000000000..e00aa891755 --- /dev/null +++ b/deploy/helm/edot-collector/kube-stack/managed_otlp/values.yaml @@ -0,0 +1,615 @@ +# For installation and configuration options, refer to the [installation instructions](https://github.com/elastic/opentelemetry/blob/main/docs/kubernetes/operator/README.md) + +# For advanced configuration options, refer to the [official OpenTelemetry Helm chart](https://github.com/open-telemetry/opentelemetry-helm-charts/blob/main/charts/opentelemetry-kube-stack/values.yaml) +# This file has been tested together with opentelemetry-kube-stack helm chart version: 0.3.3 +opentelemetry-operator: + manager: + extraArgs: + - --enable-go-instrumentation + admissionWebhooks: + certManager: + enabled: false # For production environments, it is [recommended to use cert-manager for better security and scalability](https://github.com/open-telemetry/opentelemetry-helm-charts/tree/main/charts/opentelemetry-operator#tls-certificate-requirement). + autoGenerateCert: + enabled: true # Enable/disable automatic certificate generation. Set to false if manually managing certificates. + recreate: true # Force certificate regeneration on updates. Only applicable if autoGenerateCert.enabled is true. +crds: + create: true # Install the OpenTelemetry Operator CRDs. +defaultCRConfig: + image: + repository: "docker.elastic.co/beats/elastic-agent" + tag: "9.1.0" + targetAllocator: + enabled: false # Enable/disable the Operator's Target allocator. + # Refer to: https://github.com/open-telemetry/opentelemetry-operator/tree/main/cmd/otel-allocator +clusterRole: + rules: + - apiGroups: [""] + resources: ["configmaps"] + verbs: ["get"] +# `clusterName` specifies the name of the Kubernetes cluster. It sets the 'k8s.cluster.name' field. +# Cluster Name is automatically detected for EKS/GKE/AKS. Add the below value in environments where cluster name cannot be detected. +# clusterName: myClusterName +collectors: + # Cluster is a K8s deployment EDOT collector focused on gathering telemetry + # at the cluster level (Kubernetes Events and cluster metrics). + cluster: + env: + - name: ELASTIC_AGENT_OTEL + value: '"true"' + config: + exporters: + # [Debug exporter](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/debugexporter/README.md) + debug: + verbosity: basic # Options: basic, detailed. Choose verbosity level for debug logs. + # [Elasticsearch exporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/elasticsearchexporter/README.md) + otlp/gateway: + endpoint: "http://opentelemetry-kube-stack-gateway-collector:4317" + tls: + insecure: true + processors: + # [Resource Detection Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/resourcedetectionprocessor) + resourcedetection/eks: + detectors: [env, eks] # Detects resources from environment variables and EKS (Elastic Kubernetes Service). + timeout: 15s + override: true + eks: + resource_attributes: + k8s.cluster.name: + enabled: true + resourcedetection/gcp: + detectors: [env, gcp] # Detects resources from environment variables and GCP (Google Cloud Platform). + timeout: 2s + override: true + resourcedetection/aks: + detectors: [env, aks] # Detects resources from environment variables and AKS (Azure Kubernetes Service). + timeout: 2s + override: true + aks: + resource_attributes: + k8s.cluster.name: + enabled: true + # [Resource Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/resourceprocessor) + resource/k8s: # Resource attributes tailored for services within Kubernetes. + attributes: + - key: service.name # Set the service.name resource attribute based on the well-known app.kubernetes.io/name label + from_attribute: app.label.name + action: insert + - key: service.name # Set the service.name resource attribute based on the k8s.container.name attribute + from_attribute: k8s.container.name + action: insert + - key: app.label.name # Delete app.label.name attribute previously used for service.name + action: delete + - key: service.version # Set the service.version resource attribute based on the well-known app.kubernetes.io/version label + from_attribute: app.label.version + action: insert + - key: app.label.version # Delete app.label.version attribute previously used for service.version + action: delete + resource/hostname: + attributes: + - key: host.name + from_attribute: k8s.node.name + action: upsert + # [K8s Attributes Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/k8sattributesprocessor) + k8sattributes: + passthrough: false # Annotates resources with the pod IP and does not try to extract any other metadata. + pod_association: + # Below association takes a look at the k8s.pod.ip and k8s.pod.uid resource attributes or connection's context, and tries to match it with the pod having the same attribute. + - sources: + - from: resource_attribute + name: k8s.pod.ip + - sources: + - from: resource_attribute + name: k8s.pod.uid + - sources: + - from: connection + extract: + metadata: + - "k8s.namespace.name" + - "k8s.deployment.name" + - "k8s.replicaset.name" + - "k8s.statefulset.name" + - "k8s.daemonset.name" + - "k8s.cronjob.name" + - "k8s.job.name" + - "k8s.node.name" + - "k8s.pod.name" + - "k8s.pod.ip" + - "k8s.pod.uid" + - "k8s.pod.start_time" + labels: + - tag_name: app.label.name + key: app.kubernetes.io/name + from: pod + - tag_name: app.label.version + key: app.kubernetes.io/version + from: pod + receivers: + # [K8s Objects Receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/k8sobjectsreceiver) + k8sobjects: + objects: + - name: events + mode: "watch" + group: "events.k8s.io" + exclude_watch_type: + - "DELETED" + # [K8s Cluster Receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/k8sclusterreceiver) + k8s_cluster: + auth_type: serviceAccount # Determines how to authenticate to the K8s API server. This can be one of none (for no auth), serviceAccount (to use the standard service account token provided to the agent pod), or kubeConfig to use credentials from ~/.kube/config. + node_conditions_to_report: + - Ready + - MemoryPressure + allocatable_types_to_report: + - cpu + - memory + metrics: + k8s.pod.status_reason: + enabled: true + resource_attributes: + k8s.kubelet.version: + enabled: true + os.description: + enabled: true + os.type: + enabled: true + k8s.container.status.last_terminated_reason: + enabled: true + # [Service Section](https://opentelemetry.io/docs/collector/configuration/#service) + service: + pipelines: + metrics: + exporters: + - debug + - otlp/gateway + processors: + - k8sattributes + - resourcedetection/eks + - resourcedetection/gcp + - resourcedetection/aks + - resource/k8s + - resource/hostname + receivers: + - k8s_cluster + logs: + receivers: + - k8sobjects + processors: + - resourcedetection/eks + - resourcedetection/gcp + - resourcedetection/aks + - resource/hostname + exporters: + - debug + - otlp/gateway + # Daemon is a K8s daemonset EDOT collector focused on gathering telemetry at + # node level and exposing an OTLP endpoint for data ingestion. + # Auto-instrumentation SDKs will use this endpoint. + daemon: + env: + # Work around for open /mounts error: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35990 + - name: HOST_PROC_MOUNTINFO + value: "" + - name: ELASTIC_AGENT_OTEL + value: '"true"' + presets: + logsCollection: + enabled: true # Enable/disable the collection of node's logs. + storeCheckpoints: true # Store checkpoints for log collection, allowing for resumption from the last processed log. + hostNetwork: true # Use the host's network namespace. This allows the daemon to access the network interfaces of the host directly. + securityContext: # Run the daemon as the root user and group for proper metrics collection. + runAsUser: 0 + runAsGroup: 0 + scrape_configs_file: "" # [Prometheus metrics](https://github.com/open-telemetry/opentelemetry-helm-charts/tree/main/charts/opentelemetry-kube-stack#scrape_configs_file-details) + config: + exporters: + # [Debug exporter](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/debugexporter/README.md) + debug: + verbosity: basic + otlp/gateway: + endpoint: "http://opentelemetry-kube-stack-gateway-collector-headless:4317" + tls: + insecure: true + processors: + # [Batch Processor](https://github.com/open-telemetry/opentelemetry-collector/tree/main/processor/batchprocessor) + batch: {} + batch/metrics: + # explicitly set send_batch_max_size to 0, as splitting metrics requests may cause version_conflict_engine_exception in TSDB + send_batch_max_size: 0 + timeout: 1s + # [Resource Detection Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/resourcedetectionprocessor) + resourcedetection/eks: + detectors: [env, eks] # Detects resources from environment variables and EKS (Elastic Kubernetes Service). + timeout: 15s + override: true + eks: + resource_attributes: + k8s.cluster.name: + enabled: true + resourcedetection/gcp: + detectors: [env, gcp] # Detects resources from environment variables and GCP (Google Cloud Platform). + timeout: 2s + override: true + resourcedetection/aks: + detectors: [env, aks] # Detects resources from environment variables and AKS (Azure Kubernetes Service). + timeout: 2s + override: true + aks: + resource_attributes: + k8s.cluster.name: + enabled: true + resource/hostname: + attributes: + - key: host.name + from_attribute: k8s.node.name + action: upsert + resourcedetection/system: + detectors: ["system", "ec2"] # Detects resources from the system and EC2 instances. + system: + hostname_sources: ["os"] + resource_attributes: + host.name: + enabled: true + host.id: + enabled: false + host.arch: + enabled: true + host.ip: + enabled: true + host.mac: + enabled: true + host.cpu.vendor.id: + enabled: true + host.cpu.family: + enabled: true + host.cpu.model.id: + enabled: true + host.cpu.model.name: + enabled: true + host.cpu.stepping: + enabled: true + host.cpu.cache.l2.size: + enabled: true + os.description: + enabled: true + os.type: + enabled: true + ec2: + resource_attributes: + host.name: + enabled: false + host.id: + enabled: true + # [Resource Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/resourceprocessor) + resource/k8s: # Resource attributes tailored for services within Kubernetes. + attributes: + - key: service.name # Set the service.name resource attribute based on the well-known app.kubernetes.io/name label + from_attribute: app.label.name + action: insert + - key: service.name # Set the service.name resource attribute based on the k8s.container.name attribute + from_attribute: k8s.container.name + action: insert + - key: app.label.name # Delete app.label.name attribute previously used for service.name + action: delete + - key: service.version # Set the service.version resource attribute based on the well-known app.kubernetes.io/version label + from_attribute: app.label.version + action: insert + - key: app.label.version # Delete app.label.version attribute previously used for service.version + action: delete + resource/cloud: + attributes: + - key: cloud.instance.id + from_attribute: host.id + action: insert + # [K8s Attributes Processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/k8sattributesprocessor) + k8sattributes: + filter: + # Only retrieve pods running on the same node as the collector + node_from_env_var: OTEL_K8S_NODE_NAME + passthrough: false + pod_association: + # Below association takes a look at the k8s.pod.ip and k8s.pod.uid resource attributes or connection's context, and tries to match it with the pod having the same attribute. + - sources: + - from: resource_attribute + name: k8s.pod.ip + - sources: + - from: resource_attribute + name: k8s.pod.uid + - sources: + - from: connection + extract: + metadata: + - "k8s.namespace.name" + - "k8s.deployment.name" + - "k8s.replicaset.name" + - "k8s.statefulset.name" + - "k8s.daemonset.name" + - "k8s.cronjob.name" + - "k8s.job.name" + - "k8s.node.name" + - "k8s.pod.name" + - "k8s.pod.ip" + - "k8s.pod.uid" + - "k8s.pod.start_time" + labels: + - tag_name: app.label.name + key: app.kubernetes.io/name + from: pod + - tag_name: app.label.version + key: app.kubernetes.io/version + from: pod + receivers: + # [OTLP Receiver](https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver/otlpreceiver) + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + # [File Log Receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/filelogreceiver) + filelog: + retry_on_failure: + enabled: true + start_at: end + exclude: + # exlude collector logs + - /var/log/pods/*opentelemetry-kube-stack*/*/*.log + include: + - /var/log/pods/*/*/*.log + include_file_name: false + include_file_path: true + operators: + - id: container-parser # Extract container's metadata + type: container + # [Hostmetrics Receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/hostmetricsreceiver) + hostmetrics: + collection_interval: 10s + root_path: /hostfs # Mounted node's root file system + scrapers: + cpu: + metrics: + system.cpu.utilization: + enabled: true + system.cpu.logical.count: + enabled: true + memory: + metrics: + system.memory.utilization: + enabled: true + process: + mute_process_exe_error: true + mute_process_io_error: true + mute_process_user_error: true + metrics: + process.threads: + enabled: true + process.open_file_descriptors: + enabled: true + process.memory.utilization: + enabled: true + process.disk.operations: + enabled: true + network: {} + processes: {} + load: {} + disk: {} + filesystem: + exclude_mount_points: + mount_points: + - /dev/* + - /proc/* + - /sys/* + - /run/k3s/containerd/* + - /var/lib/docker/* + - /var/lib/kubelet/* + - /snap/* + match_type: regexp + exclude_fs_types: + fs_types: + - autofs + - binfmt_misc + - bpf + - cgroup2 + - configfs + - debugfs + - devpts + - devtmpfs + - fusectl + - hugetlbfs + - iso9660 + - mqueue + - nsfs + - overlay + - proc + - procfs + - pstore + - rpc_pipefs + - securityfs + - selinuxfs + - squashfs + - sysfs + - tracefs + match_type: strict + # [Kubelet Stats Receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/kubeletstatsreceiver) + kubeletstats: + auth_type: serviceAccount # Authentication mechanism with the Kubelet endpoint, refer to: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/kubeletstatsreceiver#configuration + collection_interval: 20s + endpoint: ${env:OTEL_K8S_NODE_NAME}:10250 + node: "${env:OTEL_K8S_NODE_NAME}" + # Required to work for all CSPs without an issue + insecure_skip_verify: true + k8s_api_config: + auth_type: serviceAccount + metrics: + k8s.pod.memory.node.utilization: + enabled: true + k8s.pod.cpu.node.utilization: + enabled: true + k8s.container.cpu_limit_utilization: + enabled: true + k8s.pod.cpu_limit_utilization: + enabled: true + k8s.container.cpu_request_utilization: + enabled: true + k8s.container.memory_limit_utilization: + enabled: true + k8s.pod.memory_limit_utilization: + enabled: true + k8s.container.memory_request_utilization: + enabled: true + k8s.node.uptime: + enabled: true + k8s.node.cpu.usage: + enabled: true + k8s.pod.cpu.usage: + enabled: true + extra_metadata_labels: + - container.id + # [Service Section](https://opentelemetry.io/docs/collector/configuration/#service) + service: + pipelines: + logs/node: + receivers: + - filelog + processors: + - batch + - k8sattributes + - resourcedetection/system + - resourcedetection/eks + - resourcedetection/gcp + - resourcedetection/aks + - resource/k8s + - resource/hostname + - resource/cloud + exporters: + - otlp/gateway + metrics/node/otel: + receivers: + - kubeletstats + - hostmetrics + processors: + - batch/metrics + - k8sattributes + - resourcedetection/system + - resourcedetection/eks + - resourcedetection/gcp + - resourcedetection/aks + - resource/k8s + - resource/hostname + - resource/cloud + exporters: + # - debug + - otlp/gateway + metrics/otel-apm: + receivers: + - otlp + processors: + - batch/metrics + - resource/hostname + exporters: + - otlp/gateway + logs/apm: + receivers: + - otlp + processors: + - batch + - resource/hostname + exporters: + - otlp/gateway + traces/apm: + receivers: + - otlp + processors: + - batch + - resource/hostname + exporters: + - otlp/gateway + # Gateway is a K8s deployment EDOT collector focused on processing and + # forwarding telemetry to an Elasticsearch endpoint. + gateway: + suffix: gateway + autoscaler: + minReplicas: 2 # Start with at least 2 replicas for better availability. + maxReplicas: 5 # Allow more scale-out if needed. + targetCPUUtilization: 70 # Scale when CPU usage exceeds 70%. + targetMemoryUtilization: 75 # Scale when memory usage exceeds 75%. + resources: + limits: + cpu: 500m + memory: 1000Mi + requests: + cpu: 100m + memory: 500Mi + enabled: true + env: + - name: ELASTIC_AGENT_OTEL + value: '"true"' + - name: ELASTIC_OTLP_ENDPOINT + valueFrom: + secretKeyRef: + name: elastic-secret-otel + key: elastic_otlp_endpoint + - name: ELASTIC_API_KEY + valueFrom: + secretKeyRef: + name: elastic-secret-otel + key: elastic_api_key + config: + receivers: + otlp: + protocols: + grpc: + endpoint: ${env:MY_POD_IP}:4317 + http: + endpoint: ${env:MY_POD_IP}:4318 + processors: + batch: + send_batch_size: 1000 + timeout: 1s + send_batch_max_size: 1500 + batch/metrics: + # explicitly set send_batch_max_size to 0, as splitting metrics requests may cause version_conflict_engine_exception in TSDB + send_batch_max_size: 0 + timeout: 1s + exporters: + debug: + otlp/ingest: + endpoint: ${env:ELASTIC_OTLP_ENDPOINT} + headers: + Authorization: ApiKey ${env:ELASTIC_API_KEY} + service: + pipelines: + metrics: + receivers: [otlp] + processors: [batch/metrics] + exporters: [debug, otlp/ingest] + logs: + receivers: [otlp] + processors: [batch] + exporters: [debug, otlp/ingest] + traces: + receivers: [otlp] + processors: [batch] + exporters: [debug, otlp/ingest] +# For more details on OpenTelemetry's zero-code instrumentation, see: +# https://opentelemetry.io/docs/concepts/instrumentation/zero-code/ +instrumentation: + name: elastic-instrumentation + enabled: true # Enable/disable auto-instrumentation. + exporter: + endpoint: http://opentelemetry-kube-stack-daemon-collector.opentelemetry-operator-system.svc.cluster.local:4318 # The daemonset OpenTelemetry Collector endpoint where telemetry data will be exported. + propagators: + - tracecontext # W3C TraceContext propagator for distributed tracing. + - baggage # Baggage propagator to include baggage information in trace context. + - b3 # B3 propagator for Zipkin-based distributed tracing compatibility. + sampler: + type: parentbased_traceidratio # Sampler type + argument: "1.0" # Sampling rate set to 100% (all traces are sampled). + java: + image: docker.elastic.co/observability/elastic-otel-javaagent:1.0.0 + nodejs: + image: docker.elastic.co/observability/elastic-otel-node:0.4.1 + dotnet: + image: docker.elastic.co/observability/elastic-otel-dotnet:edge + python: + image: docker.elastic.co/observability/elastic-otel-python:0.3.0 + go: + image: ghcr.io/open-telemetry/opentelemetry-go-instrumentation/autoinstrumentation-go:v0.14.0-alpha diff --git a/internal/pkg/otel/samples/darwin/managed_otlp/logs_metrics_traces.yml b/internal/pkg/otel/samples/darwin/managed_otlp/logs_metrics_traces.yml new file mode 100644 index 00000000000..f3402f29ff3 --- /dev/null +++ b/internal/pkg/otel/samples/darwin/managed_otlp/logs_metrics_traces.yml @@ -0,0 +1,112 @@ +receivers: + # Receiver for platform specific log files + filelog/platformlogs: + include: [ /var/log/*.log ] + retry_on_failure: + enabled: true + start_at: end + storage: file_storage +# start_at: beginning + + # Receiver for CPU, Disk, Memory, and Filesystem metrics + hostmetrics/system: + collection_interval: 30s + scrapers: + filesystem: + memory: + metrics: + system.memory.utilization: + enabled: true + process: + mute_process_exe_error: true + mute_process_io_error: true + mute_process_user_error: true + metrics: + process.threads: + enabled: true + process.open_file_descriptors: + enabled: true + process.memory.utilization: + enabled: true + process.disk.operations: + enabled: true + network: + processes: + load: + + # Receiver for logs, traces, and metrics from SDKs + otlp/fromsdk: + protocols: + grpc: + http: + +extensions: + file_storage: + directory: ${env:STORAGE_DIR} + +processors: + resourcedetection: + detectors: ["system"] + system: + hostname_sources: ["os"] + resource_attributes: + host.name: + enabled: true + host.id: + enabled: false + host.arch: + enabled: true + host.ip: + enabled: true + host.mac: + enabled: true + host.cpu.vendor.id: + enabled: true + host.cpu.family: + enabled: true + host.cpu.model.id: + enabled: true + host.cpu.model.name: + enabled: true + host.cpu.stepping: + enabled: true + host.cpu.cache.l2.size: + enabled: true + os.description: + enabled: true + os.type: + enabled: true + +exporters: + otlp/ingest: + endpoint: ${env:ELASTIC_OTLP_ENDPOINT} + headers: + Authorization: ${env:ELASTIC_API_KEY} + +service: + extensions: [file_storage] + pipelines: + traces/fromsdk: + receivers: [otlp/fromsdk] + processors: [] + exporters: [otlp/ingest] + + metrics/fromsdk: + receivers: [otlp/fromsdk] + processors: [] + exporters: [otlp/ingest] + + logs/fromsdk: + receivers: [otlp/fromsdk] + processors: [] + exporters: [otlp/ingest] + + metrics/hostmetrics: + receivers: [hostmetrics/system] + processors: [resourcedetection] + exporters: [otlp/ingest] + + logs/platformlogs: + receivers: [filelog/platformlogs] + processors: [resourcedetection] + exporters: [otlp/ingest] diff --git a/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs.yml b/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs.yml new file mode 100644 index 00000000000..b4be582a6a9 --- /dev/null +++ b/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs.yml @@ -0,0 +1,67 @@ +receivers: + # Receiver for platform specific log files + filelog/platformlogs: + include: [ /var/log/*.log ] + retry_on_failure: + enabled: true + start_at: end + storage: file_storage +# start_at: beginning + +extensions: + file_storage: + directory: ${env:STORAGE_DIR} + +processors: + resourcedetection: + detectors: ["system"] + system: + hostname_sources: ["os"] + resource_attributes: + host.name: + enabled: true + host.id: + enabled: false + host.arch: + enabled: true + host.ip: + enabled: true + host.mac: + enabled: true + host.cpu.vendor.id: + enabled: true + host.cpu.family: + enabled: true + host.cpu.model.id: + enabled: true + host.cpu.model.name: + enabled: true + host.cpu.stepping: + enabled: true + host.cpu.cache.l2.size: + enabled: true + os.description: + enabled: true + os.type: + enabled: true + +exporters: + # Exporter to print the first 5 logs/metrics and then every 1000th + debug: + verbosity: detailed + sampling_initial: 5 + sampling_thereafter: 1000 + +# Exporter to send logs and metrics to Elasticsearch Managed OTLP Input + otlp/ingest: + endpoint: ${env:ELASTIC_OTLP_ENDPOINT} + headers: + Authorization: ${env:ELASTIC_API_KEY} + +service: + extensions: [file_storage] + pipelines: + logs/platformlogs: + receivers: [filelog/platformlogs] + processors: [resourcedetection] + exporters: [debug, otlp/ingest] diff --git a/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs_hostmetrics.yml b/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs_hostmetrics.yml new file mode 100644 index 00000000000..3911102e5b2 --- /dev/null +++ b/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs_hostmetrics.yml @@ -0,0 +1,91 @@ +receivers: + # Receiver for platform specific log files + filelog/platformlogs: + include: [ /var/log/*.log ] + retry_on_failure: + enabled: true + start_at: end + storage: file_storage +# start_at: beginning + + # Receiver for CPU, Disk, Memory, and Filesystem metrics + hostmetrics/system: + collection_interval: 30s + scrapers: + filesystem: + memory: + metrics: + system.memory.utilization: + enabled: true + process: + mute_process_exe_error: true + mute_process_io_error: true + mute_process_user_error: true + metrics: + process.threads: + enabled: true + process.open_file_descriptors: + enabled: true + process.memory.utilization: + enabled: true + process.disk.operations: + enabled: true + network: + processes: + load: + +extensions: + file_storage: + directory: ${env:STORAGE_DIR} + +processors: + resourcedetection: + detectors: ["system"] + system: + hostname_sources: ["os"] + resource_attributes: + host.name: + enabled: true + host.id: + enabled: false + host.arch: + enabled: true + host.ip: + enabled: true + host.mac: + enabled: true + host.cpu.vendor.id: + enabled: true + host.cpu.family: + enabled: true + host.cpu.model.id: + enabled: true + host.cpu.model.name: + enabled: true + host.cpu.stepping: + enabled: true + host.cpu.cache.l2.size: + enabled: true + os.description: + enabled: true + os.type: + enabled: true + +exporters: + # Exporter to send logs and metrics to Elasticsearch Managed OTLP Input + otlp/ingest: + endpoint: ${env:ELASTIC_OTLP_ENDPOINT} + headers: + Authorization: ${env:ELASTIC_API_KEY} + +service: + extensions: [file_storage] + pipelines: + metrics/hostmetrics: + receivers: [hostmetrics/system] + processors: [resourcedetection] + exporters: [otlp/ingest] + logs/platformlogs: + receivers: [filelog/platformlogs] + processors: [resourcedetection] + exporters: [otlp/ingest] diff --git a/internal/pkg/otel/samples/linux/managed_otlp/logs_metrics_traces.yml b/internal/pkg/otel/samples/linux/managed_otlp/logs_metrics_traces.yml new file mode 100644 index 00000000000..62e74ea6c24 --- /dev/null +++ b/internal/pkg/otel/samples/linux/managed_otlp/logs_metrics_traces.yml @@ -0,0 +1,119 @@ +receivers: + # Receiver for platform specific log files + filelog/platformlogs: + include: [/var/log/*.log] + retry_on_failure: + enabled: true + start_at: end + storage: file_storage + # start_at: beginning + + # Receiver for CPU, Disk, Memory, and Filesystem metrics + hostmetrics/system: + collection_interval: 30s + scrapers: + disk: + filesystem: + cpu: + metrics: + system.cpu.utilization: + enabled: true + system.cpu.logical.count: + enabled: true + memory: + metrics: + system.memory.utilization: + enabled: true + process: + mute_process_exe_error: true + mute_process_io_error: true + mute_process_user_error: true + metrics: + process.threads: + enabled: true + process.open_file_descriptors: + enabled: true + process.memory.utilization: + enabled: true + process.disk.operations: + enabled: true + network: + processes: + load: + + # Receiver for logs, traces, and metrics from SDKs + otlp/fromsdk: + protocols: + grpc: + http: + +extensions: + file_storage: + directory: ${env:STORAGE_DIR} + +processors: + resourcedetection: + detectors: ["system"] + system: + hostname_sources: ["os"] + resource_attributes: + host.name: + enabled: true + host.id: + enabled: false + host.arch: + enabled: true + host.ip: + enabled: true + host.mac: + enabled: true + host.cpu.vendor.id: + enabled: true + host.cpu.family: + enabled: true + host.cpu.model.id: + enabled: true + host.cpu.model.name: + enabled: true + host.cpu.stepping: + enabled: true + host.cpu.cache.l2.size: + enabled: true + os.description: + enabled: true + os.type: + enabled: true + +exporters: + otlp/ingest: + endpoint: ${env:ELASTIC_OTLP_ENDPOINT} + headers: + Authorization: ${env:ELASTIC_API_KEY} + +service: + extensions: [file_storage] + pipelines: + traces/fromsdk: + receivers: [otlp/fromsdk] + processors: [] + exporters: [otlp/ingest] + + metrics/fromsdk: + receivers: [otlp/fromsdk] + processors: [] + exporters: [otlp/ingest] + + logs/fromsdk: + receivers: [otlp/fromsdk] + processors: [] + exporters: [otlp/ingest] + + metrics/hostmetrics: + receivers: [hostmetrics/system] + processors: [resourcedetection] + exporters: [otlp/ingest] + + logs/platformlogs: + receivers: [filelog/platformlogs] + processors: [resourcedetection] + exporters: [otlp/ingest] diff --git a/internal/pkg/otel/samples/linux/managed_otlp/platformlogs.yml b/internal/pkg/otel/samples/linux/managed_otlp/platformlogs.yml new file mode 100644 index 00000000000..6fb3bc50f61 --- /dev/null +++ b/internal/pkg/otel/samples/linux/managed_otlp/platformlogs.yml @@ -0,0 +1,67 @@ +receivers: + # Receiver for platform specific log files + filelog/platformlogs: + include: [/var/log/*.log] + retry_on_failure: + enabled: true + start_at: end + storage: file_storage +# start_at: beginning + +extensions: + file_storage: + directory: ${env:STORAGE_DIR} + +processors: + resourcedetection: + detectors: ["system"] + system: + hostname_sources: ["os"] + resource_attributes: + host.name: + enabled: true + host.id: + enabled: false + host.arch: + enabled: true + host.ip: + enabled: true + host.mac: + enabled: true + host.cpu.vendor.id: + enabled: true + host.cpu.family: + enabled: true + host.cpu.model.id: + enabled: true + host.cpu.model.name: + enabled: true + host.cpu.stepping: + enabled: true + host.cpu.cache.l2.size: + enabled: true + os.description: + enabled: true + os.type: + enabled: true + +exporters: + # Exporter to print the first 5 logs/metrics and then every 1000th + debug: + verbosity: detailed + sampling_initial: 5 + sampling_thereafter: 1000 + + # Exporter to send logs and metrics to Elasticsearch Managed OTLP Input + otlp/ingest: + endpoint: ${env:ELASTIC_OTLP_ENDPOINT} + headers: + Authorization: ${env:ELASTIC_API_KEY} + +service: + extensions: [file_storage] + pipelines: + logs/platformlogs: + receivers: [filelog/platformlogs] + processors: [resourcedetection] + exporters: [debug, otlp/ingest] diff --git a/internal/pkg/otel/samples/linux/managed_otlp/platformlogs_hostmetrics.yml b/internal/pkg/otel/samples/linux/managed_otlp/platformlogs_hostmetrics.yml new file mode 100644 index 00000000000..a9f0aeb9c51 --- /dev/null +++ b/internal/pkg/otel/samples/linux/managed_otlp/platformlogs_hostmetrics.yml @@ -0,0 +1,98 @@ +receivers: + # Receiver for platform specific log files + filelog/platformlogs: + include: [/var/log/*.log] + retry_on_failure: + enabled: true + start_at: end + storage: file_storage + # start_at: beginning + + # Receiver for CPU, Disk, Memory, and Filesystem metrics + hostmetrics/system: + collection_interval: 30s + scrapers: + disk: + filesystem: + cpu: + metrics: + system.cpu.utilization: + enabled: true + system.cpu.logical.count: + enabled: true + memory: + metrics: + system.memory.utilization: + enabled: true + process: + mute_process_exe_error: true + mute_process_io_error: true + mute_process_user_error: true + metrics: + process.threads: + enabled: true + process.open_file_descriptors: + enabled: true + process.memory.utilization: + enabled: true + process.disk.operations: + enabled: true + network: + processes: + load: + +extensions: + file_storage: + directory: ${env:STORAGE_DIR} + +processors: + resourcedetection: + detectors: ["system"] + system: + hostname_sources: ["os"] + resource_attributes: + host.name: + enabled: true + host.id: + enabled: false + host.arch: + enabled: true + host.ip: + enabled: true + host.mac: + enabled: true + host.cpu.vendor.id: + enabled: true + host.cpu.family: + enabled: true + host.cpu.model.id: + enabled: true + host.cpu.model.name: + enabled: true + host.cpu.stepping: + enabled: true + host.cpu.cache.l2.size: + enabled: true + os.description: + enabled: true + os.type: + enabled: true + +exporters: + # Exporter to send logs and metrics to Elasticsearch Managed OTLP Input + otlp/ingest: + endpoint: ${env:ELASTIC_OTLP_ENDPOINT} + headers: + Authorization: ${env:ELASTIC_API_KEY} + +service: + extensions: [file_storage] + pipelines: + metrics/hostmetrics: + receivers: [hostmetrics/system] + processors: [resourcedetection] + exporters: [otlp/ingest] + logs/platformlogs: + receivers: [filelog/platformlogs] + processors: [resourcedetection] + exporters: [otlp/ingest] diff --git a/magefile.go b/magefile.go index 27e0683b0e4..c4980ab37a1 100644 --- a/magefile.go +++ b/magefile.go @@ -97,9 +97,10 @@ const ( baseURLForStagingDRA = "https://staging.elastic.co/" agentCoreProjectName = "elastic-agent-core" - helmChartPath = "./deploy/helm/elastic-agent" - helmOtelChartPath = "./deploy/helm/edot-collector/kube-stack" - sha512FileExt = ".sha512" + helmChartPath = "./deploy/helm/elastic-agent" + helmOtelChartPath = "./deploy/helm/edot-collector/kube-stack" + helmMOtelChartPath = "./deploy/helm/edot-collector/kube-stack/managed_otlp" + sha512FileExt = ".sha512" ) var ( @@ -3528,6 +3529,9 @@ func (Helm) UpdateAgentVersion() error { filepath.Join(helmOtelChartPath, "values.yaml"): { {"defaultCRConfig.image.tag", agentVersion}, }, + filepath.Join(helmMOtelChartPath, "values.yaml"): { + {"defaultCRConfig.image.tag", agentVersion}, + }, } { if err := updateYamlFile(yamlFile, keyVals...); err != nil { return fmt.Errorf("failed to update agent version: %w", err) @@ -3561,7 +3565,8 @@ func (h Helm) Lint() error { func updateYamlFile(path string, keyVal ...struct { key string value string -}) error { +}, +) error { data, err := os.ReadFile(path) if err != nil { return fmt.Errorf("failed to read file: %w", err) diff --git a/testing/integration/otel_helm_test.go b/testing/integration/otel_helm_test.go index f16664236a8..bfe5f4183c5 100644 --- a/testing/integration/otel_helm_test.go +++ b/testing/integration/otel_helm_test.go @@ -58,7 +58,7 @@ func TestOtelKubeStackHelm(t *testing.T) { steps []k8sTestStep }{ { - name: "helm kube-stack operator standalone agent kubernetes privileged", + name: "managed helm kube-stack operator standalone agent kubernetes privileged", steps: []k8sTestStep{ k8sStepCreateNamespace(), k8sStepHelmDeployWithValueOptions(chartLocation, "kube-stack-otel", @@ -85,6 +85,35 @@ func TestOtelKubeStackHelm(t *testing.T) { k8sStepCheckRunningPods("app.kubernetes.io/managed-by=opentelemetry-operator", 4, "otc-container"), }, }, + { + name: "mOTel helm kube-stack operator standalone agent kubernetes privileged", + steps: []k8sTestStep{ + k8sStepCreateNamespace(), + k8sStepHelmDeployWithValueOptions(chartLocation, "kube-stack-otel", + values.Options{ + ValueFiles: []string{"../../deploy/helm/edot-collector/kube-stack/managed_otlp/values.yaml"}, + Values: []string{fmt.Sprintf("defaultCRConfig.image.repository=%s", kCtx.agentImageRepo), fmt.Sprintf("defaultCRConfig.image.tag=%s", kCtx.agentImageTag)}, + + // override secrets reference with env variables + JSONValues: []string{ + // TODO: replace with managed OTLP ingest endpoint/apiKey when available + fmt.Sprintf(`collectors.gateway.env[1]={"name":"ELASTIC_OTLP_ENDPOINT","value":"%s"}`, "https://otlp.ingest:433"), + fmt.Sprintf(`collectors.gateway.env[2]={"name":"ELASTIC_API_KEY","value":"%s"}`, "CHANGEME=="), + }, + }, + ), + // - An OpenTelemetry Operator Deployment (1 pod per + // cluster) + k8sStepCheckRunningPods("app.kubernetes.io/name=opentelemetry-operator", 1, "manager"), + // - A Daemonset to collect K8s node's metrics and logs + // (1 EDOT collector pod per node) + // - A Cluster wide Deployment to collect K8s metrics and + // events (1 EDOT collector pod per cluster) + // - One Gateway pod to collect, aggregate and forward + // telemetry. + k8sStepCheckRunningPods("app.kubernetes.io/managed-by=opentelemetry-operator", 3, "otc-container"), + }, + }, } for _, tc := range testCases { From a4325c8f5e8cfc1dae44deb6402c875bb90a718c Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Mon, 24 Feb 2025 12:29:22 +0200 Subject: [PATCH 2/6] Allow PR comments for failures from the elastic-agent pipeline (#6980) This comment enables the native PR comment functionality from the Buildkite build bot which will allow comments in the PR about steps that failed even when they are flaky. It is enabled on the parent `elastic-agent` pipeline. It improves visibility for PR authors and the team about test failures and flakiness. --- catalog-info.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/catalog-info.yaml b/catalog-info.yaml index 8e3f7ce546d..6467f1a8b82 100644 --- a/catalog-info.yaml +++ b/catalog-info.yaml @@ -58,6 +58,7 @@ spec: skip_intermediate_builds: true skip_intermediate_builds_branch_filter: "!main !7.* !8.* !9.*" env: + ELASTIC_PR_COMMENTS_ENABLED: 'true' ELASTIC_SLACK_NOTIFICATIONS_ENABLED: "true" SLACK_NOTIFICATIONS_CHANNEL: "#ingest-notifications" SLACK_NOTIFICATIONS_ALL_BRANCHES: "false" From e50f7bf1637554ab572c29c013b95a997b02b09e Mon Sep 17 00:00:00 2001 From: Pavel Zorin Date: Mon, 24 Feb 2025 15:26:25 +0100 Subject: [PATCH 3/6] [CI] Windows 2025 integration tests (#6971) * [CI] Windows 2025 integration tests * Added .buildkite/bk.integration.pipeline.yml to triggers for extended tests --- .buildkite/bk.integration.pipeline.yml | 35 ++++++++++++++++++++++++++ .buildkite/pipeline.yml | 1 + 2 files changed, 36 insertions(+) diff --git a/.buildkite/bk.integration.pipeline.yml b/.buildkite/bk.integration.pipeline.yml index 8b966b9fbd9..189c7af7c0b 100644 --- a/.buildkite/bk.integration.pipeline.yml +++ b/.buildkite/bk.integration.pipeline.yml @@ -61,6 +61,41 @@ steps: matrix: - default + - label: "Win2025:sudo:{{matrix}}" + command: | + buildkite-agent artifact download build/distributions/** . --step 'packaging-windows' --build ${BUILDKITE_TRIGGERED_FROM_BUILD_ID} + .buildkite/scripts/integration-tests.ps1 {{matrix}} true + artifact_paths: + - build/** + - build/diagnostics/** + agents: + provider: "gcp" + machineType: "n1-standard-8" + image: "family/platform-ingest-elastic-agent-windows-2025" + matrix: + - default + - fleet + - fleet-endpoint-security + - fleet-privileged + - standalone-upgrade + - upgrade + - upgrade-flavor + - install-uninstall + + - label: "Win2025:non-sudo:{{matrix}}" + command: | + buildkite-agent artifact download build/distributions/** . --step 'packaging-windows' --build ${BUILDKITE_TRIGGERED_FROM_BUILD_ID} + .buildkite/scripts/integration-tests.ps1 {{matrix}} false + artifact_paths: + - build/** + - build/diagnostics/** + agents: + provider: "gcp" + machineType: "n1-standard-8" + image: "family/platform-ingest-elastic-agent-windows-2025" + matrix: + - default + - group: "Stateful:Ubuntu" key: integration-tests-ubuntu depends_on: diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index d85cf43113a..443175d17e1 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -288,6 +288,7 @@ steps: - main.go - .buildkite/integration.pipeline.yml + - .buildkite/bk.integration.pipeline.yml - .buildkite/pipeline.yml - .buildkite/scripts/ - .buildkite/hooks/ From d68ade9f7ed6ae95c79830754fac892ee4ee0d04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Mon, 24 Feb 2025 16:48:00 +0100 Subject: [PATCH 4/6] Fix config used in Otel manager test (#6966) Otel collector exposes metrics on port 8888 by default, which can cause conflicts with other applications in tests. Disable this. Use the nopexporter and debugpreceiver in the test instead of the otlpexporter and otlpreceiver. This is faster and doesn't require binding to any ports as well. --- internal/pkg/otel/manager/manager_test.go | 32 +++++++++++------------ 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index dd540c6fb38..53d7dcac88c 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -24,38 +24,36 @@ import ( var ( testConfig = map[string]interface{}{ "receivers": map[string]interface{}{ - "otlp": map[string]interface{}{ - "protocols": map[string]interface{}{ - "grpc": map[string]interface{}{ - "endpoint": "0.0.0.0:4317", - }, - }, - }, + "nop": map[string]interface{}{}, }, "processors": map[string]interface{}{ "batch": map[string]interface{}{}, }, "exporters": map[string]interface{}{ - "otlp": map[string]interface{}{ - "endpoint": "otelcol:4317", - }, + "debug": map[string]interface{}{}, }, "service": map[string]interface{}{ + "telemetry": map[string]interface{}{ + "metrics": map[string]interface{}{ + "level": "none", + "readers": []any{}, + }, + }, "pipelines": map[string]interface{}{ "traces": map[string]interface{}{ - "receivers": []string{"otlp"}, + "receivers": []string{"nop"}, "processors": []string{"batch"}, - "exporters": []string{"otlp"}, + "exporters": []string{"debug"}, }, "metrics": map[string]interface{}{ - "receivers": []string{"otlp"}, + "receivers": []string{"nop"}, "processors": []string{"batch"}, - "exporters": []string{"otlp"}, + "exporters": []string{"debug"}, }, "logs": map[string]interface{}{ - "receivers": []string{"otlp"}, + "receivers": []string{"nop"}, "processors": []string{"batch"}, - "exporters": []string{"otlp"}, + "exporters": []string{"debug"}, }, }, }, @@ -191,7 +189,7 @@ func TestOTelManager_ConfigError(t *testing.T) { go func() { err := m.Run(ctx) - require.ErrorIs(t, err, context.Canceled, "otel manager should be cancelled") + assert.ErrorIs(t, err, context.Canceled, "otel manager should be cancelled") }() // watch is synchronous, so we need to read from it to avoid blocking the manager From eeffc0e5a888d90c2bbb16bda3580252565ae628 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Mon, 24 Feb 2025 16:50:56 +0100 Subject: [PATCH 5/6] Add ability to run components in the Otel manager (#6697) * Add ability to run components in the Otel manager # Conflicts: # NOTICE.txt # go.mod # go.sum * Add coordinator test * Set metricbeat receiver signal type to logs * Drop unnecessary transform processor The conversion now happens in the otel consumer in beats. * Determine default datastream type from beat name * Fix diagnostics tests * Promote output queue settings to receivers * Move otel config translation to the otel package * Emit the otel component diagnostic conditionally * Add more otel config translation tests * Code review fixes * Fix diagnostics tests * Code Review fixes * Correctly set input types if not present * More code review fixes --- NOTICE.txt | 498 +++++++++--------- go.mod | 4 +- .../application/coordinator/coordinator.go | 138 ++++- .../coordinator/coordinator_unit_test.go | 143 ++++- .../pkg/agent/application/upgrade/upgrade.go | 2 +- .../pkg/otel/configtranslate/otelconfig.go | 378 +++++++++++++ .../otel/configtranslate/otelconfig_test.go | 384 ++++++++++++++ pkg/component/runtime/command.go | 2 +- pkg/component/runtime/runtime_comm.go | 10 +- 9 files changed, 1281 insertions(+), 278 deletions(-) create mode 100644 internal/pkg/otel/configtranslate/otelconfig.go create mode 100644 internal/pkg/otel/configtranslate/otelconfig_test.go diff --git a/NOTICE.txt b/NOTICE.txt index 1b16aebc735..48dce3814db 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -16900,6 +16900,218 @@ Contents of probable licence file $GOMODCACHE/go.opentelemetry.io/collector/otel limitations under the License. +-------------------------------------------------------------------------------- +Dependency : go.opentelemetry.io/collector/pipeline +Version: v0.119.0 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/go.opentelemetry.io/collector/pipeline@v0.119.0/LICENSE: + + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + -------------------------------------------------------------------------------- Dependency : go.opentelemetry.io/collector/processor Version: v0.119.0 @@ -18238,6 +18450,43 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +-------------------------------------------------------------------------------- +Dependency : golang.org/x/exp +Version: v0.0.0-20240719175910-8a7402abbf56 +Licence type (autodetected): BSD-3-Clause +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/golang.org/x/exp@v0.0.0-20240719175910-8a7402abbf56/LICENSE: + +Copyright 2009 The Go Authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google LLC nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + -------------------------------------------------------------------------------- Dependency : golang.org/x/net Version: v0.34.0 @@ -96001,218 +96250,6 @@ Contents of probable licence file $GOMODCACHE/go.opentelemetry.io/collector/pdat limitations under the License. --------------------------------------------------------------------------------- -Dependency : go.opentelemetry.io/collector/pipeline -Version: v0.119.0 -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/go.opentelemetry.io/collector/pipeline@v0.119.0/LICENSE: - - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - -------------------------------------------------------------------------------- Dependency : go.opentelemetry.io/collector/pipeline/xpipeline Version: v0.119.0 @@ -104408,43 +104445,6 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. --------------------------------------------------------------------------------- -Dependency : golang.org/x/exp -Version: v0.0.0-20240719175910-8a7402abbf56 -Licence type (autodetected): BSD-3-Clause --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/golang.org/x/exp@v0.0.0-20240719175910-8a7402abbf56/LICENSE: - -Copyright 2009 The Go Authors. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google LLC nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -------------------------------------------------------------------------------- Dependency : golang.org/x/mod Version: v0.21.0 diff --git a/go.mod b/go.mod index 9421c61ffa3..4e0235387e8 100644 --- a/go.mod +++ b/go.mod @@ -74,10 +74,12 @@ require ( go.elastic.co/ecszap v1.0.2 go.elastic.co/go-licence-detector v0.7.0 go.opentelemetry.io/collector/component/componentstatus v0.119.0 + go.opentelemetry.io/collector/pipeline v0.119.0 go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.119.0 go.opentelemetry.io/collector/receiver/nopreceiver v0.119.0 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.32.0 + golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 golang.org/x/net v0.34.0 golang.org/x/sync v0.10.0 golang.org/x/sys v0.29.0 @@ -578,7 +580,6 @@ require ( go.opentelemetry.io/collector/pdata v1.25.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.119.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.119.0 // indirect - go.opentelemetry.io/collector/pipeline v0.119.0 // indirect go.opentelemetry.io/collector/pipeline/xpipeline v0.119.0 // indirect go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.119.0 // indirect go.opentelemetry.io/collector/processor/processortest v0.119.0 // indirect @@ -617,7 +618,6 @@ require ( go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/oauth2 v0.24.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index e83547ddadf..30c7396b25e 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -13,6 +13,8 @@ import ( "sync/atomic" "time" + "github.com/elastic/elastic-agent/internal/pkg/otel/configtranslate" + "go.opentelemetry.io/collector/component/componentstatus" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" @@ -217,6 +219,12 @@ type Coordinator struct { otelMgr OTelManager otelCfg *confmap.Conf + // the final config sent to the manager, contains both config from hybrid mode and from components + finalOtelCfg *confmap.Conf + + // This variable controls whether we run supported components in the Otel manager instead of the runtime manager. + // It's a temporary measure until we decide exactly how we want to control where specific components run. + runComponentsInOtelManager bool caps capabilities.Capabilities modifiers []ComponentsModifier @@ -384,21 +392,22 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp. LogLevel: logLevel, } c := &Coordinator{ - logger: logger, - cfg: cfg, - agentInfo: agentInfo, - isManaged: isManaged, - specs: specs, - reexecMgr: reexecMgr, - upgradeMgr: upgradeMgr, - monitorMgr: monitorMgr, - runtimeMgr: runtimeMgr, - configMgr: configMgr, - varsMgr: varsMgr, - otelMgr: otelMgr, - caps: caps, - modifiers: modifiers, - state: state, + logger: logger, + cfg: cfg, + agentInfo: agentInfo, + isManaged: isManaged, + specs: specs, + reexecMgr: reexecMgr, + upgradeMgr: upgradeMgr, + monitorMgr: monitorMgr, + runtimeMgr: runtimeMgr, + configMgr: configMgr, + varsMgr: varsMgr, + otelMgr: otelMgr, + runComponentsInOtelManager: false, // change this to run supported components in the Otel manager + caps: caps, + modifiers: modifiers, + state: state, // Note: the uses of a buffered input channel in our broadcaster (the // third parameter to broadcaster.New) means that it is possible for // immediately adjacent writes/reads not to match, e.g.: @@ -775,7 +784,7 @@ func (c *Coordinator) Run(ctx context.Context) error { // information about the state of the Elastic Agent. // Called by external goroutines. func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks { - return diagnostics.Hooks{ + hooks := diagnostics.Hooks{ { Name: "agent-info", Filename: "agent-info.yaml", @@ -1016,6 +1025,26 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks { }, }, } + if c.runComponentsInOtelManager { + otelComponentHook := diagnostics.Hook{ + Name: "otel-final", + Filename: "otel-final.yaml", + Description: "Final otel configuration used by the Elastic Agent. Includes hybrid mode config and component config.", + ContentType: "application/yaml", + Hook: func(_ context.Context) []byte { + if c.finalOtelCfg == nil { + return []byte("no active OTel configuration") + } + o, err := yaml.Marshal(c.finalOtelCfg.ToStringMap()) + if err != nil { + return []byte(fmt.Sprintf("error: failed to convert to yaml: %v", err)) + } + return o + }, + } + hooks = append(hooks, otelComponentHook) + } + return hooks } // runner performs the actual work of running all the managers. @@ -1227,7 +1256,6 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (err error) { if c.otelMgr != nil { c.otelCfg = cfg.OTel - c.otelMgr.Update(cfg.OTel) } return c.processConfigAgent(ctx, cfg) } @@ -1413,17 +1441,89 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) { c.logger.Debugf("Continue with missing \"signed\" properties: %v", err) } - model := component.Model{ + model := &component.Model{ Components: c.componentModel, Signed: signed, } c.logger.Info("Updating running component model") c.logger.With("components", model.Components).Debug("Updating running component model") - c.runtimeMgr.Update(model) + return c.updateManagersWithConfig(model) +} + +// updateManagersWithConfig updates runtime managers with the component model and config. +// Components may be sent to different runtimes depending on various criteria. +func (c *Coordinator) updateManagersWithConfig(model *component.Model) error { + runtimeModel, otelModel := c.splitModelBetweenManagers(model) + c.logger.With("components", runtimeModel.Components).Debug("Updating runtime manager model") + c.runtimeMgr.Update(*runtimeModel) + return c.updateOtelManagerConfig(otelModel) +} + +// updateOtelManagerConfig updates the otel collector configuration for the otel manager. It assembles this configuration +// from the component model passed in and from the hybrid-mode otel config set on the Coordinator. +func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error { + finalOtelCfg := confmap.New() + var componentOtelCfg *confmap.Conf + if len(model.Components) > 0 { + var err error + c.logger.With("components", model.Components).Debug("Updating otel manager model") + componentOtelCfg, err = configtranslate.GetOtelConfig(model, c.agentInfo) + if err != nil { + c.logger.Errorf("failed to generate otel config: %v", err) + } + } + if componentOtelCfg != nil { + err := finalOtelCfg.Merge(componentOtelCfg) + if err != nil { + c.logger.Error("failed to merge otel config: %v", err) + } + } + + if c.otelCfg != nil { + err := finalOtelCfg.Merge(c.otelCfg) + if err != nil { + c.logger.Error("failed to merge otel config: %v", err) + } + } + + if len(finalOtelCfg.AllKeys()) == 0 { + // if the config is empty, we want to send nil to the manager, so it knows to stop the collector + finalOtelCfg = nil + } + + c.otelMgr.Update(finalOtelCfg) + c.finalOtelCfg = finalOtelCfg return nil } +// splitModelBetweenManager splits the model components between the runtime manager and the otel manager. +func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtimeModel *component.Model, otelModel *component.Model) { + if !c.runComponentsInOtelManager { + // Runtime manager gets all the components, this is the default + otelModel = &component.Model{} + runtimeModel = model + return + } + var otelComponents, runtimeComponents []component.Component + for _, comp := range model.Components { + if configtranslate.IsComponentOtelSupported(&comp) { + otelComponents = append(otelComponents, comp) + } else { + runtimeComponents = append(runtimeComponents, comp) + } + } + otelModel = &component.Model{ + Components: otelComponents, + // the signed portion of the policy is only used by Defend, so otel doesn't need it for anything + } + runtimeModel = &component.Model{ + Components: runtimeComponents, + Signed: model.Signed, + } + return +} + // generateComponentModel regenerates the configuration tree and // components from the current AST and vars and returns the result. // Called from both the main Coordinator goroutine and from external diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 762309b37ef..b0697948d29 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -468,6 +468,7 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) { upgradeMgr: upgradeMgr, // Add a placeholder runtime manager that will accept any updates runtimeMgr: &fakeRuntimeManager{}, + otelMgr: &fakeOTelManager{}, // Set valid but empty initial values for ast and vars vars: emptyVars(t), @@ -583,6 +584,7 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) { }, // Add a placeholder runtime manager that will accept any updates runtimeMgr: &fakeRuntimeManager{}, + otelMgr: &fakeOTelManager{}, // Set valid but empty initial values for ast and vars vars: emptyVars(t), @@ -681,6 +683,7 @@ func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) { configManagerUpdate: configChan, }, runtimeMgr: runtimeManager, + otelMgr: &fakeOTelManager{}, vars: emptyVars(t), componentPIDTicker: time.NewTicker(time.Second * 30), } @@ -921,6 +924,141 @@ service: assert.Nil(t, otelConfig, "empty policy should cause otel manager to get nil config") } +func TestCoordinatorPolicyChangeUpdatesRuntimeAndOTelManagerWithOtelComponents(t *testing.T) { + // Send a test policy to the Coordinator as a Config Manager update, + // verify it generates the right component model and sends components + // to both the runtime manager and the otel manager. + + // Set a one-second timeout -- nothing here should block, but if it + // does let's report a failure instead of timing out the test runner. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + logger := logp.NewLogger("testing") + + configChan := make(chan ConfigChange, 1) + + // Create a mocked runtime manager that will report the update call + var updated bool // Set by runtime manager callback + var components []component.Component // Set by runtime manager callback + runtimeManager := &fakeRuntimeManager{ + updateCallback: func(comp []component.Component) error { + updated = true + components = comp + return nil + }, + } + var otelUpdated bool // Set by otel manager callback + var otelConfig *confmap.Conf // Set by otel manager callback + otelManager := &fakeOTelManager{ + updateCallback: func(cfg *confmap.Conf) error { + otelUpdated = true + otelConfig = cfg + return nil + }, + } + + // we need the filestream spec to be able to convert to Otel config + componentSpec := component.InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Name: "filestream", + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + Platforms: []string{ + "linux/amd64", + "linux/arm64", + "darwin/amd64", + "darwin/arm64", + "windows/amd64", + "container/amd64", + "container/arm64", + }, + }, + } + + platform, err := component.LoadPlatformDetail() + require.NoError(t, err) + specs, err := component.NewRuntimeSpecs(platform, []component.InputRuntimeSpec{componentSpec}) + require.NoError(t, err) + + coord := &Coordinator{ + logger: logger, + agentInfo: &info.AgentInfo{}, + stateBroadcaster: broadcaster.New(State{}, 0, 0), + managerChans: managerChans{ + configManagerUpdate: configChan, + }, + runtimeMgr: runtimeManager, + otelMgr: otelManager, + runComponentsInOtelManager: true, + specs: specs, + vars: emptyVars(t), + componentPIDTicker: time.NewTicker(time.Second * 30), + } + + // Create a policy with one input and one output (no otel configuration) + cfg := config.MustNewConfigFrom(` +outputs: + default: + type: elasticsearch + hosts: + - localhost:9200 +inputs: + - id: test-input + type: filestream + use_output: default + - id: test-other-input + type: system/metrics + use_output: default +receivers: + nop: +exporters: + nop: +service: + pipelines: + traces: + receivers: + - nop + exporters: + - nop +`) + + // Send the policy change and make sure it was acknowledged. + cfgChange := &configChange{cfg: cfg} + configChan <- cfgChange + coord.runLoopIteration(ctx) + assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") + + // Make sure the runtime manager received the expected component update. + // An assert.Equal on the full component model doesn't play nice with + // the embedded proto structs, so instead we verify the important fields + // manually (sorry). + assert.True(t, updated, "Runtime manager should be updated after a policy change") + require.Equal(t, 1, len(components), "Test policy should generate one component") + assert.True(t, otelUpdated, "OTel manager should be updated after a policy change") + require.NotNil(t, otelConfig, "OTel manager should have config") + + runtimeComponent := components[0] + assert.Equal(t, "system/metrics-default", runtimeComponent.ID) + require.NotNil(t, runtimeComponent.Err, "Input with no spec should produce a component error") + assert.Equal(t, "input not supported", runtimeComponent.Err.Error(), "Input with no spec should report 'input not supported'") + require.Equal(t, 2, len(runtimeComponent.Units)) + + units := runtimeComponent.Units + // Verify the input unit + assert.Equal(t, "system/metrics-default-test-other-input", units[0].ID) + assert.Equal(t, client.UnitTypeInput, units[0].Type) + assert.Equal(t, "test-other-input", units[0].Config.Id) + assert.Equal(t, "system/metrics", units[0].Config.Type) + + // Verify the output unit + assert.Equal(t, "system/metrics-default", units[1].ID) + assert.Equal(t, client.UnitTypeOutput, units[1].Type) + assert.Equal(t, "elasticsearch", units[1].Config.Type) +} + func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { // Set a one-second timeout -- nothing here should block, but if it // does let's report a failure instead of timing out the test runner. @@ -950,7 +1088,9 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { // manager, so it receives the update result. runtimeManagerError: updateErrChan, }, - runtimeMgr: runtimeManager, + runtimeMgr: runtimeManager, + otelMgr: &fakeOTelManager{}, + vars: emptyVars(t), componentPIDTicker: time.NewTicker(time.Second * 30), } @@ -1075,6 +1215,7 @@ func TestCoordinatorAppliesVarsToPolicy(t *testing.T) { varsManagerUpdate: varsChan, }, runtimeMgr: runtimeManager, + otelMgr: &fakeOTelManager{}, vars: emptyVars(t), componentPIDTicker: time.NewTicker(time.Second * 30), } diff --git a/internal/pkg/agent/application/upgrade/upgrade.go b/internal/pkg/agent/application/upgrade/upgrade.go index 4d4cfb2882d..922e48a18f2 100644 --- a/internal/pkg/agent/application/upgrade/upgrade.go +++ b/internal/pkg/agent/application/upgrade/upgrade.go @@ -261,7 +261,7 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string } newRunPath := filepath.Join(newHome, "run") - oldRunPath := filepath.Join(paths.Home(), "run") + oldRunPath := filepath.Join(paths.Run()) if err := copyRunDirectory(u.log, oldRunPath, newRunPath); err != nil { return nil, errors.New(err, "failed to copy run directory") diff --git a/internal/pkg/otel/configtranslate/otelconfig.go b/internal/pkg/otel/configtranslate/otelconfig.go new file mode 100644 index 00000000000..6c83e1e6836 --- /dev/null +++ b/internal/pkg/otel/configtranslate/otelconfig.go @@ -0,0 +1,378 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package configtranslate + +import ( + "fmt" + "path/filepath" + "slices" + "strings" + + otelcomponent "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/pipeline" + "golang.org/x/exp/maps" + + elasticsearchtranslate "github.com/elastic/beats/v7/libbeat/otelbeat/oteltranslate/outputs/elasticsearch" + "github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/beats/v7/x-pack/metricbeat/mbreceiver" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/component/runtime" +) + +// This is a prefix we add to all names of Otel entities in the configuration. Its purpose is to avoid collisions with +// user-provided configuration +const OtelNamePrefix = "_agent-component/" + +type exporterConfigTranslationFunc func(*config.C) (map[string]any, error) + +var ( + OtelSupportedOutputTypes = []string{"elasticsearch"} + OtelSupportedInputTypes = []string{"filestream"} + configTranslationFuncForExporter = map[otelcomponent.Type]exporterConfigTranslationFunc{ + otelcomponent.MustNewType("elasticsearch"): translateEsOutputToExporter, + } +) + +// GetOtelConfig returns the Otel collector configuration for the given component model. +// All added component and pipelines names are prefixed with OtelNamePrefix. +// Unsupported components are quietly ignored. +func GetOtelConfig(model *component.Model, info info.Agent) (*confmap.Conf, error) { + components := getSupportedComponents(model) + if len(components) == 0 { + return nil, nil + } + otelConfig := confmap.New() // base config, nothing here for now + + for _, comp := range components { + componentConfig, compErr := getCollectorConfigForComponent(comp, info) + if compErr != nil { + return nil, compErr + } + // the assumption here is that each component will define its own receivers, and the shared exporters + // will be merged + mergeErr := otelConfig.Merge(componentConfig) + if mergeErr != nil { + return nil, fmt.Errorf("error merging otel config for component %s: %w", comp.ID, mergeErr) + } + } + return otelConfig, nil +} + +// IsComponentOtelSupported checks if the given component can be run in an Otel Collector. +func IsComponentOtelSupported(comp *component.Component) bool { + return slices.Contains(OtelSupportedOutputTypes, comp.OutputType) && + slices.Contains(OtelSupportedInputTypes, comp.InputType) +} + +// getSupportedComponents returns components from the given model that can be run in an Otel Collector. +func getSupportedComponents(model *component.Model) []*component.Component { + var supportedComponents []*component.Component + + for _, comp := range model.Components { + comp := comp + if IsComponentOtelSupported(&comp) { + supportedComponents = append(supportedComponents, &comp) + } + } + + return supportedComponents +} + +// getPipelineID returns the pipeline id for the given component. +func getPipelineID(comp *component.Component) (pipeline.ID, error) { + signal, err := getSignalForComponent(comp) + if err != nil { + return pipeline.ID{}, err + } + pipelineName := fmt.Sprintf("%s%s", OtelNamePrefix, comp.ID) + return pipeline.NewIDWithName(signal, pipelineName), nil +} + +// getReceiverID returns the receiver id for the given unit and exporter type. +func getReceiverID(receiverType otelcomponent.Type, unitID string) otelcomponent.ID { + receiverName := fmt.Sprintf("%s%s", OtelNamePrefix, unitID) + return otelcomponent.NewIDWithName(receiverType, receiverName) +} + +// getExporterID returns the exporter id for the given exporter type and output name. +func getExporterID(exporterType otelcomponent.Type, outputName string) otelcomponent.ID { + exporterName := fmt.Sprintf("%s%s", OtelNamePrefix, outputName) + return otelcomponent.NewIDWithName(exporterType, exporterName) +} + +// getCollectorConfigForComponent returns the Otel collector config required to run the given component. +// This function returns a full, valid configuration that can then be merged with configurations for other components. +func getCollectorConfigForComponent(comp *component.Component, info info.Agent) (*confmap.Conf, error) { + outputQueueConfig := getOutputQueueConfig(comp) + receiversConfig, err := getReceiversConfigForComponent(comp, info, outputQueueConfig) + if err != nil { + return nil, err + } + exportersConfig, err := getExportersConfigForComponent(comp) + if err != nil { + return nil, err + } + pipelineID, err := getPipelineID(comp) + if err != nil { + return nil, err + } + pipelinesConfig := map[string]any{ + pipelineID.String(): map[string][]string{ + "exporters": maps.Keys(exportersConfig), + "receivers": maps.Keys(receiversConfig), + }, + } + + fullConfig := map[string]any{ + "receivers": receiversConfig, + "exporters": exportersConfig, + "service": map[string]any{ + "pipelines": pipelinesConfig, + }, + } + return confmap.NewFromStringMap(fullConfig), nil +} + +// getReceiversConfigForComponent returns the receivers configuration for a component. Usually this will be a single +// receiver, but in principle it could be more. +func getReceiversConfigForComponent(comp *component.Component, info info.Agent, outputQueueConfig map[string]any) (map[string]any, error) { + receiverType, err := getReceiverTypeForComponent(comp) + if err != nil { + return nil, err + } + // this is necessary to convert policy config format to beat config format + defaultDataStreamType, err := getDefaultDatastreamTypeForComponent(comp) + if err != nil { + return nil, err + } + + // get inputs for all the units + // we run a single receiver for each component to mirror what beats processes do + var inputs []map[string]any + for _, unit := range comp.Units { + if unit.Type == client.UnitTypeInput { + unitInputs, err := getInputsForUnit(unit, info, defaultDataStreamType, comp.InputType) + if err != nil { + return nil, err + } + inputs = append(inputs, unitInputs...) + } + } + + receiverId := getReceiverID(receiverType, comp.ID) + // Beat config inside a beat receiver is nested under an additional key. Not sure if this simple translation is + // always safe. We should either ensure this is always the case, or have an explicit mapping. + beatName := strings.TrimSuffix(receiverType.String(), "receiver") + beatDataPath := filepath.Join(paths.Run(), comp.ID) + receiverConfig := map[string]any{ + beatName: map[string]any{ + "inputs": inputs, + }, + // the output needs to be otelconsumer + "output": map[string]any{ + "otelconsumer": map[string]any{}, + }, + // just like we do for beats processes, each receiver needs its own data path + "path": map[string]any{ + "data": beatDataPath, + }, + } + // add the output queue config if present + if outputQueueConfig != nil { + receiverConfig["output"] = outputQueueConfig + } + return map[string]any{ + receiverId.String(): receiverConfig, + }, nil +} + +// getReceiversConfigForComponent returns the exporters configuration for a component. Usually this will be a single +// exporter, but in principle it could be more. +func getExportersConfigForComponent(comp *component.Component) (map[string]any, error) { + exportersConfig := map[string]any{} + exporterType, err := getExporterTypeForComponent(comp) + if err != nil { + return nil, err + } + for _, unit := range comp.Units { + if unit.Type == client.UnitTypeOutput { + unitExportersConfig, expErr := unitToExporterConfig(unit, exporterType, comp.InputType) + if expErr != nil { + return nil, expErr + } + for k, v := range unitExportersConfig { + exportersConfig[k] = v + } + } + } + return exportersConfig, nil +} + +// getBeatNameForComponent returns the beat binary name that would be used to run this component. +func getBeatNameForComponent(comp *component.Component) string { + // TODO: Add this information directly to the spec? + if comp.InputSpec == nil || comp.InputSpec.BinaryName != "agentbeat" { + return "" + } + return comp.InputSpec.Spec.Command.Args[0] +} + +// getSignalForComponent returns the otel signal for the given component. Currently, this is always logs, even for +// metricbeat. +func getSignalForComponent(comp *component.Component) (pipeline.Signal, error) { + beatName := getBeatNameForComponent(comp) + switch beatName { + case "filebeat", "metricbeat": + return pipeline.SignalLogs, nil + default: + return pipeline.Signal{}, fmt.Errorf("unknown otel signal for input type: %s", comp.InputType) + } +} + +// getReceiverTypeForComponent returns the receiver type for the given component. +func getReceiverTypeForComponent(comp *component.Component) (otelcomponent.Type, error) { + beatName := getBeatNameForComponent(comp) + switch beatName { + case "filebeat": + return otelcomponent.MustNewType(fbreceiver.Name), nil + case "metricbeat": + return otelcomponent.MustNewType(mbreceiver.Name), nil + default: + return otelcomponent.Type{}, fmt.Errorf("unknown otel receiver type for input type: %s", comp.InputType) + } +} + +// getExporterTypeForComponent returns the exporter type for the given component. +func getExporterTypeForComponent(comp *component.Component) (otelcomponent.Type, error) { + switch comp.OutputType { + case "elasticsearch": + return otelcomponent.MustNewType("elasticsearch"), nil + default: + return otelcomponent.Type{}, fmt.Errorf("unknown otel exporter type for output type: %s", comp.OutputType) + } +} + +// unitToExporterConfig translates a component.Unit to an otel exporter configuration. +func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type, inputType string) (map[string]any, error) { + if unit.Type == client.UnitTypeInput { + return nil, fmt.Errorf("unit type is an input, expected output: %v", unit) + } + configTranslationFunc, ok := configTranslationFuncForExporter[exporterType] + if !ok { + return nil, fmt.Errorf("no config translation function for exporter type: %s", exporterType) + } + // we'd like to use the same exporter for all outputs with the same name, so we parse out the name for the unit id + // these will be deduplicated by the configuration merging process at the end + outputName := strings.TrimPrefix(unit.ID, inputType+"-") // TODO: Use a more structured approach here + exporterId := getExporterID(exporterType, outputName) + + // translate the configuration + unitConfigMap := unit.Config.GetSource().AsMap() // this is what beats do in libbeat/management/generate.go + outputCfgC, err := config.NewConfigFrom(unitConfigMap) + if err != nil { + return nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err) + } + exporterConfig, err := configTranslationFunc(outputCfgC) + if err != nil { + return nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err) + } + + exportersCfg := map[string]any{ + exporterId.String(): exporterConfig, + } + + return exportersCfg, nil +} + +// getInputsForUnit returns the beat inputs for a unit. These can directly be plugged into a beats receiver config. +// It mainly calls a conversion function from the control protocol client. +func getInputsForUnit(unit component.Unit, info info.Agent, defaultDataStreamType string, inputType string) ([]map[string]any, error) { + agentInfo := &client.AgentInfo{ + ID: info.AgentID(), + Version: info.Version(), + Snapshot: info.Snapshot(), + ManagedMode: runtime.ProtoAgentMode(info), + Unprivileged: info.Unprivileged(), + } + inputs, err := management.CreateInputsFromStreams(unit.Config, defaultDataStreamType, agentInfo) + if err != nil { + return nil, err + } + // Add the type to each input. CreateInputsFromStreams doesn't do this, each beat does it on its own in a transform + // function. For filebeat, see: https://github.com/elastic/beats/blob/main/x-pack/filebeat/cmd/agent.go + + for _, input := range inputs { + if _, ok := input["type"]; !ok { + input["type"] = inputType + } + } + + return inputs, nil +} + +// getDefaultDatastreamTypeForComponent returns the default datastream type for a given component. +// This is needed to translate from the agent policy config format to the beats config format. +func getDefaultDatastreamTypeForComponent(comp *component.Component) (string, error) { + beatName := getBeatNameForComponent(comp) + switch beatName { + case "filebeat": + return "logs", nil + case "metricbeat": + return "metrics", nil + default: + return "", fmt.Errorf("input type not supported by Otel: %s", comp.InputType) + } +} + +// translateEsOutputToExporter translates an elasticsearch output configuration to an elasticsearch exporter configuration. +func translateEsOutputToExporter(cfg *config.C) (map[string]any, error) { + esConfig, err := elasticsearchtranslate.ToOTelConfig(cfg) + if err != nil { + return nil, err + } + // we want to use dynamic indexing + esConfig["logs_dynamic_index"] = map[string]any{"enabled": true} + esConfig["metrics_dynamic_index"] = map[string]any{"enabled": true} + + // we also want to use dynamic log ids + esConfig["logs_dynamic_id"] = map[string]any{"enabled": true} + + // for compatibility with beats, we want bodymap mapping + esConfig["mapping"] = map[string]any{"mode": "bodymap"} + return esConfig, nil +} + +// This is copied from https://github.com/elastic/beats/blob/main/libbeat/otelbeat/beatconverter/beatconverter.go +// getOutputQueueConfig gets the queue settings for the output unit in the component. We need to move these settings +// to the receiver configuration. +func getOutputQueueConfig(comp *component.Component) map[string]any { + // find the output unit config + var unitConfigMap map[string]any + for _, unit := range comp.Units { + if unit.Type == client.UnitTypeOutput { + unitConfigMap = unit.Config.GetSource().AsMap() + } + } + if unitConfigMap == nil { + return nil + } + + queueConfig, ok := unitConfigMap["queue"] + if !ok { + return nil + } + queueConfigMap, ok := queueConfig.(map[string]any) + if !ok { + return nil + } + + return queueConfigMap +} diff --git a/internal/pkg/otel/configtranslate/otelconfig_test.go b/internal/pkg/otel/configtranslate/otelconfig_test.go new file mode 100644 index 00000000000..8883ae58fcb --- /dev/null +++ b/internal/pkg/otel/configtranslate/otelconfig_test.go @@ -0,0 +1,384 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package configtranslate + +import ( + "fmt" + "path/filepath" + "testing" + "time" + + "go.opentelemetry.io/collector/confmap" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pipeline" + + "github.com/elastic/elastic-agent/pkg/component" +) + +func TestBeatNameToDefaultDatastreamType(t *testing.T) { + tests := []struct { + beatName string + expectedType string + expectedError error + }{ + { + beatName: "filebeat", + expectedType: "logs", + }, + { + beatName: "metricbeat", + expectedType: "metrics", + }, + { + beatName: "cloudbeat", + expectedError: fmt.Errorf("input type not supported by Otel: "), + }, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("%v", tt.beatName), func(t *testing.T) { + comp := component.Component{ + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{tt.beatName}, + }, + }, + }, + } + actualType, actualError := getDefaultDatastreamTypeForComponent(&comp) + assert.Equal(t, tt.expectedType, actualType) + + if tt.expectedError != nil { + assert.Error(t, actualError) + assert.EqualError(t, actualError, tt.expectedError.Error()) + } else { + assert.NoError(t, actualError) + } + }) + } +} + +func TestGetSignalForComponent(t *testing.T) { + tests := []struct { + name string + component component.Component + expectedSignal pipeline.Signal + expectedError error + }{ + { + name: "no input spec", + component: component.Component{InputType: "test"}, + expectedError: fmt.Errorf("unknown otel signal for input type: %s", "test"), + }, + { + name: "not agentbeat", + component: component.Component{ + InputType: "test", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "cloudbeat", + }, + }, + expectedError: fmt.Errorf("unknown otel signal for input type: %s", "test"), + }, + { + name: "filebeat", + component: component.Component{ + InputType: "filestream", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + }, + expectedSignal: pipeline.SignalLogs, + }, + { + name: "metricbeat", + component: component.Component{ + InputType: "filestream", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"metricbeat"}, + }, + }, + }, + }, + expectedSignal: pipeline.SignalLogs, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualSignal, actualError := getSignalForComponent(&tt.component) + assert.Equal(t, tt.expectedSignal, actualSignal) + + if tt.expectedError != nil { + assert.Error(t, actualError) + assert.EqualError(t, actualError, tt.expectedError.Error()) + } else { + assert.NoError(t, actualError) + } + }) + } +} + +func TestGetOtelConfig(t *testing.T) { + agentInfo := &info.AgentInfo{} + fileStreamConfig := map[string]any{ + "id": "test", + "use_output": "default", + "streams": []any{ + map[string]any{ + "id": "test-1", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "paths": []any{ + "/var/log/*.log", + }, + }, + map[string]any{ + "id": "test-2", + "data_stream": map[string]any{ + "dataset": "generic-2", + }, + "paths": []any{ + "/var/log/*.log", + }, + }, + }, + } + esOutputConfig := map[string]any{ + "type": "elasticsearch", + "hosts": []any{"localhost:9200"}, + "username": "elastic", + "password": "password", + } + defaultProcessors := func(streamId, dataset string) []any { + return []any{ + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "input_id": "test", + }, + "target": "@metadata", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "dataset": dataset, + "namespace": "default", + "type": "logs", + }, + "target": "data_stream", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "dataset": dataset, + }, + "target": "event", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "stream_id": streamId, + }, + "target": "@metadata", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "id": agentInfo.AgentID(), + "snapshot": agentInfo.Snapshot(), + "version": agentInfo.Version(), + }, + "target": "elastic_agent", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "id": agentInfo.AgentID(), + }, + "target": "agent", + }, + }, + } + } + tests := []struct { + name string + model *component.Model + expectedConfig *confmap.Conf + expectedError error + }{ + { + name: "no supported components", + model: &component.Model{ + Components: []component.Component{ + { + InputType: "test", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "cloudbeat", + }, + }, + }, + }, + }, + { + name: "filestream", + model: &component.Model{ + Components: []component.Component{ + { + ID: "filestream-default", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "filestream-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(fileStreamConfig), + }, + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig), + }, + }, + }, + }, + }, + expectedConfig: confmap.NewFromStringMap(map[string]any{ + "exporters": map[string]any{ + "elasticsearch/_agent-component/default": map[string]any{ + "batcher": map[string]any{ + "enabled": true, + "max_size_items": 1600, + }, + "mapping": map[string]any{ + "mode": "bodymap", + }, + "endpoints": []string{"http://localhost:9200"}, + "password": "password", + "user": "elastic", + "retry": map[string]any{ + "enabled": true, + "initial_interval": 1 * time.Second, + "max_interval": 1 * time.Minute, + "max_retries": 3, + }, + "logs_dynamic_index": map[string]any{ + "enabled": true, + }, + "logs_dynamic_id": map[string]any{ + "enabled": true, + }, + "num_workers": 0, + "api_key": "", + "logs_index": "filebeat-9.0.0", + "timeout": 90 * time.Second, + "idle_conn_timeout": 3 * time.Second, + "metrics_dynamic_index": map[string]any{ + "enabled": true, + }, + }, + }, + "receivers": map[string]any{ + "filebeatreceiver/_agent-component/filestream-default": map[string]any{ + "filebeat": map[string]any{ + "inputs": []map[string]any{ + { + "id": "test-1", + "type": "filestream", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "paths": []any{ + "/var/log/*.log", + }, + "index": "logs-generic-1-default", + "processors": defaultProcessors("test-1", "generic-1"), + }, + { + "id": "test-2", + "type": "filestream", + "data_stream": map[string]any{ + "dataset": "generic-2", + }, + "paths": []any{ + "/var/log/*.log", + }, + "index": "logs-generic-2-default", + "processors": defaultProcessors("test-2", "generic-2"), + }, + }, + }, + "output": map[string]any{ + "otelconsumer": map[string]any{}, + }, + "path": map[string]any{ + "data": filepath.Join(paths.Run(), "filestream-default"), + }, + }, + }, + "service": map[string]any{ + "pipelines": map[string]any{ + "logs/_agent-component/filestream-default": map[string][]string{ + "exporters": []string{"elasticsearch/_agent-component/default"}, + "receivers": []string{"filebeatreceiver/_agent-component/filestream-default"}, + }, + }, + }, + }), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualConf, actualError := GetOtelConfig(tt.model, agentInfo) + if actualConf == nil || tt.expectedConfig == nil { + assert.Equal(t, tt.expectedConfig, actualConf) + } else { // this gives a nicer diff + assert.Equal(t, tt.expectedConfig.ToStringMap(), actualConf.ToStringMap()) + } + + if actualConf != nil { + t.Logf("%v", actualConf.ToStringMap()) + } + if tt.expectedError != nil { + assert.Error(t, actualError) + assert.EqualError(t, actualError, tt.expectedError.Error()) + } else { + assert.NoError(t, actualError) + } + }) + } +} + +// TODO: Add unit tests for other config generation functions diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index d9cb268fa8e..b8b68e2a638 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -380,7 +380,7 @@ func (c *commandRuntime) start(comm Communicator) error { args := c.monitor.EnrichArgs(c.current.ID, c.getSpecBinaryName(), cmdSpec.Args) // differentiate data paths - dataPath := filepath.Join(paths.Home(), "run", c.current.ID) + dataPath := filepath.Join(paths.Run(), c.current.ID) _ = os.MkdirAll(dataPath, 0755) args = append(args, "-E", "path.data="+dataPath) diff --git a/pkg/component/runtime/runtime_comm.go b/pkg/component/runtime/runtime_comm.go index cdeaad0a6f6..6904ad5da84 100644 --- a/pkg/component/runtime/runtime_comm.go +++ b/pkg/component/runtime/runtime_comm.go @@ -128,12 +128,12 @@ func (c *runtimeComm) WriteStartUpInfo(w io.Writer, services ...client.Service) Services: srvs, // chunking is always allowed if the client supports it Supports: []proto.ConnectionSupports{proto.ConnectionSupports_CheckinChunking}, - MaxMessageSize: uint32(c.maxMessageSize), + MaxMessageSize: uint32(c.maxMessageSize), //nolint:gosec // guaranteed to be valid AgentInfo: &proto.AgentInfo{ Id: c.agentInfo.AgentID(), Version: c.agentInfo.Version(), Snapshot: c.agentInfo.Snapshot(), - Mode: protoAgentMode(c.agentInfo), + Mode: ProtoAgentMode(c.agentInfo), Unprivileged: c.agentInfo.Unprivileged(), }, } @@ -158,7 +158,7 @@ func (c *runtimeComm) CheckinExpected( Id: c.agentInfo.AgentID(), Version: c.agentInfo.Version(), Snapshot: c.agentInfo.Snapshot(), - Mode: protoAgentMode(c.agentInfo), + Mode: ProtoAgentMode(c.agentInfo), Unprivileged: c.agentInfo.Unprivileged(), } } else { @@ -439,8 +439,8 @@ func sendExpectedChunked(server proto.ElasticAgent_CheckinV2Server, msg *proto.C return nil } -// protoAgentMode converts the agent info mode bool to the AgentManagedMode enum -func protoAgentMode(agent info.Agent) proto.AgentManagedMode { +// ProtoAgentMode converts the agent info mode bool to the AgentManagedMode enum +func ProtoAgentMode(agent info.Agent) proto.AgentManagedMode { if agent.IsStandalone() { return proto.AgentManagedMode_STANDALONE } From 106a1a2925f6bfe29c1e378ce9fd8ae94d95e930 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 24 Feb 2025 21:56:37 +0000 Subject: [PATCH 6/6] [main][Automation] Update versions (#6897) * [main][Automation] Update versions These files are used for picking the starting (pre-upgrade) or ending (post-upgrade) agent versions in upgrade integration tests. The content is based on responses from https://www.elastic.co/api/product_versions and https://snapshots.elastic.co The current update is generated based on the following requirements: Package version: 9.1.0 ```json { "UpgradeToVersion": "9.1.0", "CurrentMajors": 1, "PreviousMajors": 1, "PreviousMinors": 2, "SnapshotBranches": [ "9.0", "8.x", "7.17" ] } ``` * fix upgrade versions --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Andrzej Stencel --- .../integration/testdata/.upgrade-test-agent-versions.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/testing/integration/testdata/.upgrade-test-agent-versions.yml b/testing/integration/testdata/.upgrade-test-agent-versions.yml index 8abe24a29f1..550aa561b49 100644 --- a/testing/integration/testdata/.upgrade-test-agent-versions.yml +++ b/testing/integration/testdata/.upgrade-test-agent-versions.yml @@ -5,8 +5,8 @@ # upgrade integration tests. testVersions: + - 8.19.0-SNAPSHOT - 8.18.0-SNAPSHOT - - 8.17.1-SNAPSHOT - - 8.17.0 - - 8.16.2 + - 8.17.2 + - 8.16.4 - 7.17.28-SNAPSHOT