From eeffc0e5a888d90c2bbb16bda3580252565ae628 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Mon, 24 Feb 2025 16:50:56 +0100 Subject: [PATCH] Add ability to run components in the Otel manager (#6697) * Add ability to run components in the Otel manager # Conflicts: # NOTICE.txt # go.mod # go.sum * Add coordinator test * Set metricbeat receiver signal type to logs * Drop unnecessary transform processor The conversion now happens in the otel consumer in beats. * Determine default datastream type from beat name * Fix diagnostics tests * Promote output queue settings to receivers * Move otel config translation to the otel package * Emit the otel component diagnostic conditionally * Add more otel config translation tests * Code review fixes * Fix diagnostics tests * Code Review fixes * Correctly set input types if not present * More code review fixes --- NOTICE.txt | 498 +++++++++--------- go.mod | 4 +- .../application/coordinator/coordinator.go | 138 ++++- .../coordinator/coordinator_unit_test.go | 143 ++++- .../pkg/agent/application/upgrade/upgrade.go | 2 +- .../pkg/otel/configtranslate/otelconfig.go | 378 +++++++++++++ .../otel/configtranslate/otelconfig_test.go | 384 ++++++++++++++ pkg/component/runtime/command.go | 2 +- pkg/component/runtime/runtime_comm.go | 10 +- 9 files changed, 1281 insertions(+), 278 deletions(-) create mode 100644 internal/pkg/otel/configtranslate/otelconfig.go create mode 100644 internal/pkg/otel/configtranslate/otelconfig_test.go diff --git a/NOTICE.txt b/NOTICE.txt index 1b16aebc735..48dce3814db 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -16900,6 +16900,218 @@ Contents of probable licence file $GOMODCACHE/go.opentelemetry.io/collector/otel limitations under the License. +-------------------------------------------------------------------------------- +Dependency : go.opentelemetry.io/collector/pipeline +Version: v0.119.0 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/go.opentelemetry.io/collector/pipeline@v0.119.0/LICENSE: + + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + -------------------------------------------------------------------------------- Dependency : go.opentelemetry.io/collector/processor Version: v0.119.0 @@ -18238,6 +18450,43 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +-------------------------------------------------------------------------------- +Dependency : golang.org/x/exp +Version: v0.0.0-20240719175910-8a7402abbf56 +Licence type (autodetected): BSD-3-Clause +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/golang.org/x/exp@v0.0.0-20240719175910-8a7402abbf56/LICENSE: + +Copyright 2009 The Go Authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google LLC nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + -------------------------------------------------------------------------------- Dependency : golang.org/x/net Version: v0.34.0 @@ -96001,218 +96250,6 @@ Contents of probable licence file $GOMODCACHE/go.opentelemetry.io/collector/pdat limitations under the License. --------------------------------------------------------------------------------- -Dependency : go.opentelemetry.io/collector/pipeline -Version: v0.119.0 -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/go.opentelemetry.io/collector/pipeline@v0.119.0/LICENSE: - - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - -------------------------------------------------------------------------------- Dependency : go.opentelemetry.io/collector/pipeline/xpipeline Version: v0.119.0 @@ -104408,43 +104445,6 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. --------------------------------------------------------------------------------- -Dependency : golang.org/x/exp -Version: v0.0.0-20240719175910-8a7402abbf56 -Licence type (autodetected): BSD-3-Clause --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/golang.org/x/exp@v0.0.0-20240719175910-8a7402abbf56/LICENSE: - -Copyright 2009 The Go Authors. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google LLC nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -------------------------------------------------------------------------------- Dependency : golang.org/x/mod Version: v0.21.0 diff --git a/go.mod b/go.mod index 9421c61ffa3..4e0235387e8 100644 --- a/go.mod +++ b/go.mod @@ -74,10 +74,12 @@ require ( go.elastic.co/ecszap v1.0.2 go.elastic.co/go-licence-detector v0.7.0 go.opentelemetry.io/collector/component/componentstatus v0.119.0 + go.opentelemetry.io/collector/pipeline v0.119.0 go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.119.0 go.opentelemetry.io/collector/receiver/nopreceiver v0.119.0 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.32.0 + golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 golang.org/x/net v0.34.0 golang.org/x/sync v0.10.0 golang.org/x/sys v0.29.0 @@ -578,7 +580,6 @@ require ( go.opentelemetry.io/collector/pdata v1.25.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.119.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.119.0 // indirect - go.opentelemetry.io/collector/pipeline v0.119.0 // indirect go.opentelemetry.io/collector/pipeline/xpipeline v0.119.0 // indirect go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.119.0 // indirect go.opentelemetry.io/collector/processor/processortest v0.119.0 // indirect @@ -617,7 +618,6 @@ require ( go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/oauth2 v0.24.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index e83547ddadf..30c7396b25e 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -13,6 +13,8 @@ import ( "sync/atomic" "time" + "github.com/elastic/elastic-agent/internal/pkg/otel/configtranslate" + "go.opentelemetry.io/collector/component/componentstatus" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" @@ -217,6 +219,12 @@ type Coordinator struct { otelMgr OTelManager otelCfg *confmap.Conf + // the final config sent to the manager, contains both config from hybrid mode and from components + finalOtelCfg *confmap.Conf + + // This variable controls whether we run supported components in the Otel manager instead of the runtime manager. + // It's a temporary measure until we decide exactly how we want to control where specific components run. + runComponentsInOtelManager bool caps capabilities.Capabilities modifiers []ComponentsModifier @@ -384,21 +392,22 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp. LogLevel: logLevel, } c := &Coordinator{ - logger: logger, - cfg: cfg, - agentInfo: agentInfo, - isManaged: isManaged, - specs: specs, - reexecMgr: reexecMgr, - upgradeMgr: upgradeMgr, - monitorMgr: monitorMgr, - runtimeMgr: runtimeMgr, - configMgr: configMgr, - varsMgr: varsMgr, - otelMgr: otelMgr, - caps: caps, - modifiers: modifiers, - state: state, + logger: logger, + cfg: cfg, + agentInfo: agentInfo, + isManaged: isManaged, + specs: specs, + reexecMgr: reexecMgr, + upgradeMgr: upgradeMgr, + monitorMgr: monitorMgr, + runtimeMgr: runtimeMgr, + configMgr: configMgr, + varsMgr: varsMgr, + otelMgr: otelMgr, + runComponentsInOtelManager: false, // change this to run supported components in the Otel manager + caps: caps, + modifiers: modifiers, + state: state, // Note: the uses of a buffered input channel in our broadcaster (the // third parameter to broadcaster.New) means that it is possible for // immediately adjacent writes/reads not to match, e.g.: @@ -775,7 +784,7 @@ func (c *Coordinator) Run(ctx context.Context) error { // information about the state of the Elastic Agent. // Called by external goroutines. func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks { - return diagnostics.Hooks{ + hooks := diagnostics.Hooks{ { Name: "agent-info", Filename: "agent-info.yaml", @@ -1016,6 +1025,26 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks { }, }, } + if c.runComponentsInOtelManager { + otelComponentHook := diagnostics.Hook{ + Name: "otel-final", + Filename: "otel-final.yaml", + Description: "Final otel configuration used by the Elastic Agent. Includes hybrid mode config and component config.", + ContentType: "application/yaml", + Hook: func(_ context.Context) []byte { + if c.finalOtelCfg == nil { + return []byte("no active OTel configuration") + } + o, err := yaml.Marshal(c.finalOtelCfg.ToStringMap()) + if err != nil { + return []byte(fmt.Sprintf("error: failed to convert to yaml: %v", err)) + } + return o + }, + } + hooks = append(hooks, otelComponentHook) + } + return hooks } // runner performs the actual work of running all the managers. @@ -1227,7 +1256,6 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) { func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (err error) { if c.otelMgr != nil { c.otelCfg = cfg.OTel - c.otelMgr.Update(cfg.OTel) } return c.processConfigAgent(ctx, cfg) } @@ -1413,17 +1441,89 @@ func (c *Coordinator) refreshComponentModel(ctx context.Context) (err error) { c.logger.Debugf("Continue with missing \"signed\" properties: %v", err) } - model := component.Model{ + model := &component.Model{ Components: c.componentModel, Signed: signed, } c.logger.Info("Updating running component model") c.logger.With("components", model.Components).Debug("Updating running component model") - c.runtimeMgr.Update(model) + return c.updateManagersWithConfig(model) +} + +// updateManagersWithConfig updates runtime managers with the component model and config. +// Components may be sent to different runtimes depending on various criteria. +func (c *Coordinator) updateManagersWithConfig(model *component.Model) error { + runtimeModel, otelModel := c.splitModelBetweenManagers(model) + c.logger.With("components", runtimeModel.Components).Debug("Updating runtime manager model") + c.runtimeMgr.Update(*runtimeModel) + return c.updateOtelManagerConfig(otelModel) +} + +// updateOtelManagerConfig updates the otel collector configuration for the otel manager. It assembles this configuration +// from the component model passed in and from the hybrid-mode otel config set on the Coordinator. +func (c *Coordinator) updateOtelManagerConfig(model *component.Model) error { + finalOtelCfg := confmap.New() + var componentOtelCfg *confmap.Conf + if len(model.Components) > 0 { + var err error + c.logger.With("components", model.Components).Debug("Updating otel manager model") + componentOtelCfg, err = configtranslate.GetOtelConfig(model, c.agentInfo) + if err != nil { + c.logger.Errorf("failed to generate otel config: %v", err) + } + } + if componentOtelCfg != nil { + err := finalOtelCfg.Merge(componentOtelCfg) + if err != nil { + c.logger.Error("failed to merge otel config: %v", err) + } + } + + if c.otelCfg != nil { + err := finalOtelCfg.Merge(c.otelCfg) + if err != nil { + c.logger.Error("failed to merge otel config: %v", err) + } + } + + if len(finalOtelCfg.AllKeys()) == 0 { + // if the config is empty, we want to send nil to the manager, so it knows to stop the collector + finalOtelCfg = nil + } + + c.otelMgr.Update(finalOtelCfg) + c.finalOtelCfg = finalOtelCfg return nil } +// splitModelBetweenManager splits the model components between the runtime manager and the otel manager. +func (c *Coordinator) splitModelBetweenManagers(model *component.Model) (runtimeModel *component.Model, otelModel *component.Model) { + if !c.runComponentsInOtelManager { + // Runtime manager gets all the components, this is the default + otelModel = &component.Model{} + runtimeModel = model + return + } + var otelComponents, runtimeComponents []component.Component + for _, comp := range model.Components { + if configtranslate.IsComponentOtelSupported(&comp) { + otelComponents = append(otelComponents, comp) + } else { + runtimeComponents = append(runtimeComponents, comp) + } + } + otelModel = &component.Model{ + Components: otelComponents, + // the signed portion of the policy is only used by Defend, so otel doesn't need it for anything + } + runtimeModel = &component.Model{ + Components: runtimeComponents, + Signed: model.Signed, + } + return +} + // generateComponentModel regenerates the configuration tree and // components from the current AST and vars and returns the result. // Called from both the main Coordinator goroutine and from external diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index 762309b37ef..b0697948d29 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -468,6 +468,7 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) { upgradeMgr: upgradeMgr, // Add a placeholder runtime manager that will accept any updates runtimeMgr: &fakeRuntimeManager{}, + otelMgr: &fakeOTelManager{}, // Set valid but empty initial values for ast and vars vars: emptyVars(t), @@ -583,6 +584,7 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) { }, // Add a placeholder runtime manager that will accept any updates runtimeMgr: &fakeRuntimeManager{}, + otelMgr: &fakeOTelManager{}, // Set valid but empty initial values for ast and vars vars: emptyVars(t), @@ -681,6 +683,7 @@ func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) { configManagerUpdate: configChan, }, runtimeMgr: runtimeManager, + otelMgr: &fakeOTelManager{}, vars: emptyVars(t), componentPIDTicker: time.NewTicker(time.Second * 30), } @@ -921,6 +924,141 @@ service: assert.Nil(t, otelConfig, "empty policy should cause otel manager to get nil config") } +func TestCoordinatorPolicyChangeUpdatesRuntimeAndOTelManagerWithOtelComponents(t *testing.T) { + // Send a test policy to the Coordinator as a Config Manager update, + // verify it generates the right component model and sends components + // to both the runtime manager and the otel manager. + + // Set a one-second timeout -- nothing here should block, but if it + // does let's report a failure instead of timing out the test runner. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + logger := logp.NewLogger("testing") + + configChan := make(chan ConfigChange, 1) + + // Create a mocked runtime manager that will report the update call + var updated bool // Set by runtime manager callback + var components []component.Component // Set by runtime manager callback + runtimeManager := &fakeRuntimeManager{ + updateCallback: func(comp []component.Component) error { + updated = true + components = comp + return nil + }, + } + var otelUpdated bool // Set by otel manager callback + var otelConfig *confmap.Conf // Set by otel manager callback + otelManager := &fakeOTelManager{ + updateCallback: func(cfg *confmap.Conf) error { + otelUpdated = true + otelConfig = cfg + return nil + }, + } + + // we need the filestream spec to be able to convert to Otel config + componentSpec := component.InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Name: "filestream", + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + Platforms: []string{ + "linux/amd64", + "linux/arm64", + "darwin/amd64", + "darwin/arm64", + "windows/amd64", + "container/amd64", + "container/arm64", + }, + }, + } + + platform, err := component.LoadPlatformDetail() + require.NoError(t, err) + specs, err := component.NewRuntimeSpecs(platform, []component.InputRuntimeSpec{componentSpec}) + require.NoError(t, err) + + coord := &Coordinator{ + logger: logger, + agentInfo: &info.AgentInfo{}, + stateBroadcaster: broadcaster.New(State{}, 0, 0), + managerChans: managerChans{ + configManagerUpdate: configChan, + }, + runtimeMgr: runtimeManager, + otelMgr: otelManager, + runComponentsInOtelManager: true, + specs: specs, + vars: emptyVars(t), + componentPIDTicker: time.NewTicker(time.Second * 30), + } + + // Create a policy with one input and one output (no otel configuration) + cfg := config.MustNewConfigFrom(` +outputs: + default: + type: elasticsearch + hosts: + - localhost:9200 +inputs: + - id: test-input + type: filestream + use_output: default + - id: test-other-input + type: system/metrics + use_output: default +receivers: + nop: +exporters: + nop: +service: + pipelines: + traces: + receivers: + - nop + exporters: + - nop +`) + + // Send the policy change and make sure it was acknowledged. + cfgChange := &configChange{cfg: cfg} + configChan <- cfgChange + coord.runLoopIteration(ctx) + assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") + + // Make sure the runtime manager received the expected component update. + // An assert.Equal on the full component model doesn't play nice with + // the embedded proto structs, so instead we verify the important fields + // manually (sorry). + assert.True(t, updated, "Runtime manager should be updated after a policy change") + require.Equal(t, 1, len(components), "Test policy should generate one component") + assert.True(t, otelUpdated, "OTel manager should be updated after a policy change") + require.NotNil(t, otelConfig, "OTel manager should have config") + + runtimeComponent := components[0] + assert.Equal(t, "system/metrics-default", runtimeComponent.ID) + require.NotNil(t, runtimeComponent.Err, "Input with no spec should produce a component error") + assert.Equal(t, "input not supported", runtimeComponent.Err.Error(), "Input with no spec should report 'input not supported'") + require.Equal(t, 2, len(runtimeComponent.Units)) + + units := runtimeComponent.Units + // Verify the input unit + assert.Equal(t, "system/metrics-default-test-other-input", units[0].ID) + assert.Equal(t, client.UnitTypeInput, units[0].Type) + assert.Equal(t, "test-other-input", units[0].Config.Id) + assert.Equal(t, "system/metrics", units[0].Config.Type) + + // Verify the output unit + assert.Equal(t, "system/metrics-default", units[1].ID) + assert.Equal(t, client.UnitTypeOutput, units[1].Type) + assert.Equal(t, "elasticsearch", units[1].Config.Type) +} + func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { // Set a one-second timeout -- nothing here should block, but if it // does let's report a failure instead of timing out the test runner. @@ -950,7 +1088,9 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { // manager, so it receives the update result. runtimeManagerError: updateErrChan, }, - runtimeMgr: runtimeManager, + runtimeMgr: runtimeManager, + otelMgr: &fakeOTelManager{}, + vars: emptyVars(t), componentPIDTicker: time.NewTicker(time.Second * 30), } @@ -1075,6 +1215,7 @@ func TestCoordinatorAppliesVarsToPolicy(t *testing.T) { varsManagerUpdate: varsChan, }, runtimeMgr: runtimeManager, + otelMgr: &fakeOTelManager{}, vars: emptyVars(t), componentPIDTicker: time.NewTicker(time.Second * 30), } diff --git a/internal/pkg/agent/application/upgrade/upgrade.go b/internal/pkg/agent/application/upgrade/upgrade.go index 4d4cfb2882d..922e48a18f2 100644 --- a/internal/pkg/agent/application/upgrade/upgrade.go +++ b/internal/pkg/agent/application/upgrade/upgrade.go @@ -261,7 +261,7 @@ func (u *Upgrader) Upgrade(ctx context.Context, version string, sourceURI string } newRunPath := filepath.Join(newHome, "run") - oldRunPath := filepath.Join(paths.Home(), "run") + oldRunPath := filepath.Join(paths.Run()) if err := copyRunDirectory(u.log, oldRunPath, newRunPath); err != nil { return nil, errors.New(err, "failed to copy run directory") diff --git a/internal/pkg/otel/configtranslate/otelconfig.go b/internal/pkg/otel/configtranslate/otelconfig.go new file mode 100644 index 00000000000..6c83e1e6836 --- /dev/null +++ b/internal/pkg/otel/configtranslate/otelconfig.go @@ -0,0 +1,378 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package configtranslate + +import ( + "fmt" + "path/filepath" + "slices" + "strings" + + otelcomponent "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/pipeline" + "golang.org/x/exp/maps" + + elasticsearchtranslate "github.com/elastic/beats/v7/libbeat/otelbeat/oteltranslate/outputs/elasticsearch" + "github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/beats/v7/x-pack/metricbeat/mbreceiver" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/component/runtime" +) + +// This is a prefix we add to all names of Otel entities in the configuration. Its purpose is to avoid collisions with +// user-provided configuration +const OtelNamePrefix = "_agent-component/" + +type exporterConfigTranslationFunc func(*config.C) (map[string]any, error) + +var ( + OtelSupportedOutputTypes = []string{"elasticsearch"} + OtelSupportedInputTypes = []string{"filestream"} + configTranslationFuncForExporter = map[otelcomponent.Type]exporterConfigTranslationFunc{ + otelcomponent.MustNewType("elasticsearch"): translateEsOutputToExporter, + } +) + +// GetOtelConfig returns the Otel collector configuration for the given component model. +// All added component and pipelines names are prefixed with OtelNamePrefix. +// Unsupported components are quietly ignored. +func GetOtelConfig(model *component.Model, info info.Agent) (*confmap.Conf, error) { + components := getSupportedComponents(model) + if len(components) == 0 { + return nil, nil + } + otelConfig := confmap.New() // base config, nothing here for now + + for _, comp := range components { + componentConfig, compErr := getCollectorConfigForComponent(comp, info) + if compErr != nil { + return nil, compErr + } + // the assumption here is that each component will define its own receivers, and the shared exporters + // will be merged + mergeErr := otelConfig.Merge(componentConfig) + if mergeErr != nil { + return nil, fmt.Errorf("error merging otel config for component %s: %w", comp.ID, mergeErr) + } + } + return otelConfig, nil +} + +// IsComponentOtelSupported checks if the given component can be run in an Otel Collector. +func IsComponentOtelSupported(comp *component.Component) bool { + return slices.Contains(OtelSupportedOutputTypes, comp.OutputType) && + slices.Contains(OtelSupportedInputTypes, comp.InputType) +} + +// getSupportedComponents returns components from the given model that can be run in an Otel Collector. +func getSupportedComponents(model *component.Model) []*component.Component { + var supportedComponents []*component.Component + + for _, comp := range model.Components { + comp := comp + if IsComponentOtelSupported(&comp) { + supportedComponents = append(supportedComponents, &comp) + } + } + + return supportedComponents +} + +// getPipelineID returns the pipeline id for the given component. +func getPipelineID(comp *component.Component) (pipeline.ID, error) { + signal, err := getSignalForComponent(comp) + if err != nil { + return pipeline.ID{}, err + } + pipelineName := fmt.Sprintf("%s%s", OtelNamePrefix, comp.ID) + return pipeline.NewIDWithName(signal, pipelineName), nil +} + +// getReceiverID returns the receiver id for the given unit and exporter type. +func getReceiverID(receiverType otelcomponent.Type, unitID string) otelcomponent.ID { + receiverName := fmt.Sprintf("%s%s", OtelNamePrefix, unitID) + return otelcomponent.NewIDWithName(receiverType, receiverName) +} + +// getExporterID returns the exporter id for the given exporter type and output name. +func getExporterID(exporterType otelcomponent.Type, outputName string) otelcomponent.ID { + exporterName := fmt.Sprintf("%s%s", OtelNamePrefix, outputName) + return otelcomponent.NewIDWithName(exporterType, exporterName) +} + +// getCollectorConfigForComponent returns the Otel collector config required to run the given component. +// This function returns a full, valid configuration that can then be merged with configurations for other components. +func getCollectorConfigForComponent(comp *component.Component, info info.Agent) (*confmap.Conf, error) { + outputQueueConfig := getOutputQueueConfig(comp) + receiversConfig, err := getReceiversConfigForComponent(comp, info, outputQueueConfig) + if err != nil { + return nil, err + } + exportersConfig, err := getExportersConfigForComponent(comp) + if err != nil { + return nil, err + } + pipelineID, err := getPipelineID(comp) + if err != nil { + return nil, err + } + pipelinesConfig := map[string]any{ + pipelineID.String(): map[string][]string{ + "exporters": maps.Keys(exportersConfig), + "receivers": maps.Keys(receiversConfig), + }, + } + + fullConfig := map[string]any{ + "receivers": receiversConfig, + "exporters": exportersConfig, + "service": map[string]any{ + "pipelines": pipelinesConfig, + }, + } + return confmap.NewFromStringMap(fullConfig), nil +} + +// getReceiversConfigForComponent returns the receivers configuration for a component. Usually this will be a single +// receiver, but in principle it could be more. +func getReceiversConfigForComponent(comp *component.Component, info info.Agent, outputQueueConfig map[string]any) (map[string]any, error) { + receiverType, err := getReceiverTypeForComponent(comp) + if err != nil { + return nil, err + } + // this is necessary to convert policy config format to beat config format + defaultDataStreamType, err := getDefaultDatastreamTypeForComponent(comp) + if err != nil { + return nil, err + } + + // get inputs for all the units + // we run a single receiver for each component to mirror what beats processes do + var inputs []map[string]any + for _, unit := range comp.Units { + if unit.Type == client.UnitTypeInput { + unitInputs, err := getInputsForUnit(unit, info, defaultDataStreamType, comp.InputType) + if err != nil { + return nil, err + } + inputs = append(inputs, unitInputs...) + } + } + + receiverId := getReceiverID(receiverType, comp.ID) + // Beat config inside a beat receiver is nested under an additional key. Not sure if this simple translation is + // always safe. We should either ensure this is always the case, or have an explicit mapping. + beatName := strings.TrimSuffix(receiverType.String(), "receiver") + beatDataPath := filepath.Join(paths.Run(), comp.ID) + receiverConfig := map[string]any{ + beatName: map[string]any{ + "inputs": inputs, + }, + // the output needs to be otelconsumer + "output": map[string]any{ + "otelconsumer": map[string]any{}, + }, + // just like we do for beats processes, each receiver needs its own data path + "path": map[string]any{ + "data": beatDataPath, + }, + } + // add the output queue config if present + if outputQueueConfig != nil { + receiverConfig["output"] = outputQueueConfig + } + return map[string]any{ + receiverId.String(): receiverConfig, + }, nil +} + +// getReceiversConfigForComponent returns the exporters configuration for a component. Usually this will be a single +// exporter, but in principle it could be more. +func getExportersConfigForComponent(comp *component.Component) (map[string]any, error) { + exportersConfig := map[string]any{} + exporterType, err := getExporterTypeForComponent(comp) + if err != nil { + return nil, err + } + for _, unit := range comp.Units { + if unit.Type == client.UnitTypeOutput { + unitExportersConfig, expErr := unitToExporterConfig(unit, exporterType, comp.InputType) + if expErr != nil { + return nil, expErr + } + for k, v := range unitExportersConfig { + exportersConfig[k] = v + } + } + } + return exportersConfig, nil +} + +// getBeatNameForComponent returns the beat binary name that would be used to run this component. +func getBeatNameForComponent(comp *component.Component) string { + // TODO: Add this information directly to the spec? + if comp.InputSpec == nil || comp.InputSpec.BinaryName != "agentbeat" { + return "" + } + return comp.InputSpec.Spec.Command.Args[0] +} + +// getSignalForComponent returns the otel signal for the given component. Currently, this is always logs, even for +// metricbeat. +func getSignalForComponent(comp *component.Component) (pipeline.Signal, error) { + beatName := getBeatNameForComponent(comp) + switch beatName { + case "filebeat", "metricbeat": + return pipeline.SignalLogs, nil + default: + return pipeline.Signal{}, fmt.Errorf("unknown otel signal for input type: %s", comp.InputType) + } +} + +// getReceiverTypeForComponent returns the receiver type for the given component. +func getReceiverTypeForComponent(comp *component.Component) (otelcomponent.Type, error) { + beatName := getBeatNameForComponent(comp) + switch beatName { + case "filebeat": + return otelcomponent.MustNewType(fbreceiver.Name), nil + case "metricbeat": + return otelcomponent.MustNewType(mbreceiver.Name), nil + default: + return otelcomponent.Type{}, fmt.Errorf("unknown otel receiver type for input type: %s", comp.InputType) + } +} + +// getExporterTypeForComponent returns the exporter type for the given component. +func getExporterTypeForComponent(comp *component.Component) (otelcomponent.Type, error) { + switch comp.OutputType { + case "elasticsearch": + return otelcomponent.MustNewType("elasticsearch"), nil + default: + return otelcomponent.Type{}, fmt.Errorf("unknown otel exporter type for output type: %s", comp.OutputType) + } +} + +// unitToExporterConfig translates a component.Unit to an otel exporter configuration. +func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type, inputType string) (map[string]any, error) { + if unit.Type == client.UnitTypeInput { + return nil, fmt.Errorf("unit type is an input, expected output: %v", unit) + } + configTranslationFunc, ok := configTranslationFuncForExporter[exporterType] + if !ok { + return nil, fmt.Errorf("no config translation function for exporter type: %s", exporterType) + } + // we'd like to use the same exporter for all outputs with the same name, so we parse out the name for the unit id + // these will be deduplicated by the configuration merging process at the end + outputName := strings.TrimPrefix(unit.ID, inputType+"-") // TODO: Use a more structured approach here + exporterId := getExporterID(exporterType, outputName) + + // translate the configuration + unitConfigMap := unit.Config.GetSource().AsMap() // this is what beats do in libbeat/management/generate.go + outputCfgC, err := config.NewConfigFrom(unitConfigMap) + if err != nil { + return nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err) + } + exporterConfig, err := configTranslationFunc(outputCfgC) + if err != nil { + return nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err) + } + + exportersCfg := map[string]any{ + exporterId.String(): exporterConfig, + } + + return exportersCfg, nil +} + +// getInputsForUnit returns the beat inputs for a unit. These can directly be plugged into a beats receiver config. +// It mainly calls a conversion function from the control protocol client. +func getInputsForUnit(unit component.Unit, info info.Agent, defaultDataStreamType string, inputType string) ([]map[string]any, error) { + agentInfo := &client.AgentInfo{ + ID: info.AgentID(), + Version: info.Version(), + Snapshot: info.Snapshot(), + ManagedMode: runtime.ProtoAgentMode(info), + Unprivileged: info.Unprivileged(), + } + inputs, err := management.CreateInputsFromStreams(unit.Config, defaultDataStreamType, agentInfo) + if err != nil { + return nil, err + } + // Add the type to each input. CreateInputsFromStreams doesn't do this, each beat does it on its own in a transform + // function. For filebeat, see: https://github.com/elastic/beats/blob/main/x-pack/filebeat/cmd/agent.go + + for _, input := range inputs { + if _, ok := input["type"]; !ok { + input["type"] = inputType + } + } + + return inputs, nil +} + +// getDefaultDatastreamTypeForComponent returns the default datastream type for a given component. +// This is needed to translate from the agent policy config format to the beats config format. +func getDefaultDatastreamTypeForComponent(comp *component.Component) (string, error) { + beatName := getBeatNameForComponent(comp) + switch beatName { + case "filebeat": + return "logs", nil + case "metricbeat": + return "metrics", nil + default: + return "", fmt.Errorf("input type not supported by Otel: %s", comp.InputType) + } +} + +// translateEsOutputToExporter translates an elasticsearch output configuration to an elasticsearch exporter configuration. +func translateEsOutputToExporter(cfg *config.C) (map[string]any, error) { + esConfig, err := elasticsearchtranslate.ToOTelConfig(cfg) + if err != nil { + return nil, err + } + // we want to use dynamic indexing + esConfig["logs_dynamic_index"] = map[string]any{"enabled": true} + esConfig["metrics_dynamic_index"] = map[string]any{"enabled": true} + + // we also want to use dynamic log ids + esConfig["logs_dynamic_id"] = map[string]any{"enabled": true} + + // for compatibility with beats, we want bodymap mapping + esConfig["mapping"] = map[string]any{"mode": "bodymap"} + return esConfig, nil +} + +// This is copied from https://github.com/elastic/beats/blob/main/libbeat/otelbeat/beatconverter/beatconverter.go +// getOutputQueueConfig gets the queue settings for the output unit in the component. We need to move these settings +// to the receiver configuration. +func getOutputQueueConfig(comp *component.Component) map[string]any { + // find the output unit config + var unitConfigMap map[string]any + for _, unit := range comp.Units { + if unit.Type == client.UnitTypeOutput { + unitConfigMap = unit.Config.GetSource().AsMap() + } + } + if unitConfigMap == nil { + return nil + } + + queueConfig, ok := unitConfigMap["queue"] + if !ok { + return nil + } + queueConfigMap, ok := queueConfig.(map[string]any) + if !ok { + return nil + } + + return queueConfigMap +} diff --git a/internal/pkg/otel/configtranslate/otelconfig_test.go b/internal/pkg/otel/configtranslate/otelconfig_test.go new file mode 100644 index 00000000000..8883ae58fcb --- /dev/null +++ b/internal/pkg/otel/configtranslate/otelconfig_test.go @@ -0,0 +1,384 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package configtranslate + +import ( + "fmt" + "path/filepath" + "testing" + "time" + + "go.opentelemetry.io/collector/confmap" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pipeline" + + "github.com/elastic/elastic-agent/pkg/component" +) + +func TestBeatNameToDefaultDatastreamType(t *testing.T) { + tests := []struct { + beatName string + expectedType string + expectedError error + }{ + { + beatName: "filebeat", + expectedType: "logs", + }, + { + beatName: "metricbeat", + expectedType: "metrics", + }, + { + beatName: "cloudbeat", + expectedError: fmt.Errorf("input type not supported by Otel: "), + }, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("%v", tt.beatName), func(t *testing.T) { + comp := component.Component{ + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{tt.beatName}, + }, + }, + }, + } + actualType, actualError := getDefaultDatastreamTypeForComponent(&comp) + assert.Equal(t, tt.expectedType, actualType) + + if tt.expectedError != nil { + assert.Error(t, actualError) + assert.EqualError(t, actualError, tt.expectedError.Error()) + } else { + assert.NoError(t, actualError) + } + }) + } +} + +func TestGetSignalForComponent(t *testing.T) { + tests := []struct { + name string + component component.Component + expectedSignal pipeline.Signal + expectedError error + }{ + { + name: "no input spec", + component: component.Component{InputType: "test"}, + expectedError: fmt.Errorf("unknown otel signal for input type: %s", "test"), + }, + { + name: "not agentbeat", + component: component.Component{ + InputType: "test", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "cloudbeat", + }, + }, + expectedError: fmt.Errorf("unknown otel signal for input type: %s", "test"), + }, + { + name: "filebeat", + component: component.Component{ + InputType: "filestream", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + }, + expectedSignal: pipeline.SignalLogs, + }, + { + name: "metricbeat", + component: component.Component{ + InputType: "filestream", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"metricbeat"}, + }, + }, + }, + }, + expectedSignal: pipeline.SignalLogs, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualSignal, actualError := getSignalForComponent(&tt.component) + assert.Equal(t, tt.expectedSignal, actualSignal) + + if tt.expectedError != nil { + assert.Error(t, actualError) + assert.EqualError(t, actualError, tt.expectedError.Error()) + } else { + assert.NoError(t, actualError) + } + }) + } +} + +func TestGetOtelConfig(t *testing.T) { + agentInfo := &info.AgentInfo{} + fileStreamConfig := map[string]any{ + "id": "test", + "use_output": "default", + "streams": []any{ + map[string]any{ + "id": "test-1", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "paths": []any{ + "/var/log/*.log", + }, + }, + map[string]any{ + "id": "test-2", + "data_stream": map[string]any{ + "dataset": "generic-2", + }, + "paths": []any{ + "/var/log/*.log", + }, + }, + }, + } + esOutputConfig := map[string]any{ + "type": "elasticsearch", + "hosts": []any{"localhost:9200"}, + "username": "elastic", + "password": "password", + } + defaultProcessors := func(streamId, dataset string) []any { + return []any{ + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "input_id": "test", + }, + "target": "@metadata", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "dataset": dataset, + "namespace": "default", + "type": "logs", + }, + "target": "data_stream", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "dataset": dataset, + }, + "target": "event", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "stream_id": streamId, + }, + "target": "@metadata", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "id": agentInfo.AgentID(), + "snapshot": agentInfo.Snapshot(), + "version": agentInfo.Version(), + }, + "target": "elastic_agent", + }, + }, + mapstr.M{ + "add_fields": mapstr.M{ + "fields": mapstr.M{ + "id": agentInfo.AgentID(), + }, + "target": "agent", + }, + }, + } + } + tests := []struct { + name string + model *component.Model + expectedConfig *confmap.Conf + expectedError error + }{ + { + name: "no supported components", + model: &component.Model{ + Components: []component.Component{ + { + InputType: "test", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "cloudbeat", + }, + }, + }, + }, + }, + { + name: "filestream", + model: &component.Model{ + Components: []component.Component{ + { + ID: "filestream-default", + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "filestream-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(fileStreamConfig), + }, + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig), + }, + }, + }, + }, + }, + expectedConfig: confmap.NewFromStringMap(map[string]any{ + "exporters": map[string]any{ + "elasticsearch/_agent-component/default": map[string]any{ + "batcher": map[string]any{ + "enabled": true, + "max_size_items": 1600, + }, + "mapping": map[string]any{ + "mode": "bodymap", + }, + "endpoints": []string{"http://localhost:9200"}, + "password": "password", + "user": "elastic", + "retry": map[string]any{ + "enabled": true, + "initial_interval": 1 * time.Second, + "max_interval": 1 * time.Minute, + "max_retries": 3, + }, + "logs_dynamic_index": map[string]any{ + "enabled": true, + }, + "logs_dynamic_id": map[string]any{ + "enabled": true, + }, + "num_workers": 0, + "api_key": "", + "logs_index": "filebeat-9.0.0", + "timeout": 90 * time.Second, + "idle_conn_timeout": 3 * time.Second, + "metrics_dynamic_index": map[string]any{ + "enabled": true, + }, + }, + }, + "receivers": map[string]any{ + "filebeatreceiver/_agent-component/filestream-default": map[string]any{ + "filebeat": map[string]any{ + "inputs": []map[string]any{ + { + "id": "test-1", + "type": "filestream", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "paths": []any{ + "/var/log/*.log", + }, + "index": "logs-generic-1-default", + "processors": defaultProcessors("test-1", "generic-1"), + }, + { + "id": "test-2", + "type": "filestream", + "data_stream": map[string]any{ + "dataset": "generic-2", + }, + "paths": []any{ + "/var/log/*.log", + }, + "index": "logs-generic-2-default", + "processors": defaultProcessors("test-2", "generic-2"), + }, + }, + }, + "output": map[string]any{ + "otelconsumer": map[string]any{}, + }, + "path": map[string]any{ + "data": filepath.Join(paths.Run(), "filestream-default"), + }, + }, + }, + "service": map[string]any{ + "pipelines": map[string]any{ + "logs/_agent-component/filestream-default": map[string][]string{ + "exporters": []string{"elasticsearch/_agent-component/default"}, + "receivers": []string{"filebeatreceiver/_agent-component/filestream-default"}, + }, + }, + }, + }), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualConf, actualError := GetOtelConfig(tt.model, agentInfo) + if actualConf == nil || tt.expectedConfig == nil { + assert.Equal(t, tt.expectedConfig, actualConf) + } else { // this gives a nicer diff + assert.Equal(t, tt.expectedConfig.ToStringMap(), actualConf.ToStringMap()) + } + + if actualConf != nil { + t.Logf("%v", actualConf.ToStringMap()) + } + if tt.expectedError != nil { + assert.Error(t, actualError) + assert.EqualError(t, actualError, tt.expectedError.Error()) + } else { + assert.NoError(t, actualError) + } + }) + } +} + +// TODO: Add unit tests for other config generation functions diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index d9cb268fa8e..b8b68e2a638 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -380,7 +380,7 @@ func (c *commandRuntime) start(comm Communicator) error { args := c.monitor.EnrichArgs(c.current.ID, c.getSpecBinaryName(), cmdSpec.Args) // differentiate data paths - dataPath := filepath.Join(paths.Home(), "run", c.current.ID) + dataPath := filepath.Join(paths.Run(), c.current.ID) _ = os.MkdirAll(dataPath, 0755) args = append(args, "-E", "path.data="+dataPath) diff --git a/pkg/component/runtime/runtime_comm.go b/pkg/component/runtime/runtime_comm.go index cdeaad0a6f6..6904ad5da84 100644 --- a/pkg/component/runtime/runtime_comm.go +++ b/pkg/component/runtime/runtime_comm.go @@ -128,12 +128,12 @@ func (c *runtimeComm) WriteStartUpInfo(w io.Writer, services ...client.Service) Services: srvs, // chunking is always allowed if the client supports it Supports: []proto.ConnectionSupports{proto.ConnectionSupports_CheckinChunking}, - MaxMessageSize: uint32(c.maxMessageSize), + MaxMessageSize: uint32(c.maxMessageSize), //nolint:gosec // guaranteed to be valid AgentInfo: &proto.AgentInfo{ Id: c.agentInfo.AgentID(), Version: c.agentInfo.Version(), Snapshot: c.agentInfo.Snapshot(), - Mode: protoAgentMode(c.agentInfo), + Mode: ProtoAgentMode(c.agentInfo), Unprivileged: c.agentInfo.Unprivileged(), }, } @@ -158,7 +158,7 @@ func (c *runtimeComm) CheckinExpected( Id: c.agentInfo.AgentID(), Version: c.agentInfo.Version(), Snapshot: c.agentInfo.Snapshot(), - Mode: protoAgentMode(c.agentInfo), + Mode: ProtoAgentMode(c.agentInfo), Unprivileged: c.agentInfo.Unprivileged(), } } else { @@ -439,8 +439,8 @@ func sendExpectedChunked(server proto.ElasticAgent_CheckinV2Server, msg *proto.C return nil } -// protoAgentMode converts the agent info mode bool to the AgentManagedMode enum -func protoAgentMode(agent info.Agent) proto.AgentManagedMode { +// ProtoAgentMode converts the agent info mode bool to the AgentManagedMode enum +func ProtoAgentMode(agent info.Agent) proto.AgentManagedMode { if agent.IsStandalone() { return proto.AgentManagedMode_STANDALONE }