diff --git a/NOTICE.txt b/NOTICE.txt index 7459922..0b48738 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -642,12 +642,12 @@ these terms. -------------------------------------------------------------------------------- -Dependency : github.com/elastic/go-ucfg -Version: v0.8.6 +Dependency : github.com/elastic/go-elasticsearch/v8 +Version: v8.2.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-ucfg@v0.8.6/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-elasticsearch/v8@v8.2.0/LICENSE: Apache License Version 2.0, January 2004 @@ -829,7 +829,7 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-ucfg@v0.8.6/ APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "{}" + boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a @@ -837,7 +837,7 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-ucfg@v0.8.6/ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright {yyyy} {name of copyright owner} + Copyright 2018 Elasticsearch BV Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -853,42 +853,12 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-ucfg@v0.8.6/ -------------------------------------------------------------------------------- -Dependency : github.com/gofrs/uuid -Version: v4.2.0+incompatible -Licence type (autodetected): MIT --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/gofrs/uuid@v4.2.0+incompatible/LICENSE: - -Copyright (C) 2013-2018 by Maxim Bublis - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - - --------------------------------------------------------------------------------- -Dependency : github.com/magefile/mage -Version: v1.14.0 +Dependency : github.com/elastic/go-ucfg +Version: v0.8.6 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/magefile/mage@v1.14.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-ucfg@v0.8.6/LICENSE: Apache License Version 2.0, January 2004 @@ -1078,7 +1048,7 @@ Contents of probable licence file $GOMODCACHE/github.com/magefile/mage@v1.14.0/L same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2017 the Mage authors + Copyright {yyyy} {name of copyright owner} Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -1094,14 +1064,44 @@ Contents of probable licence file $GOMODCACHE/github.com/magefile/mage@v1.14.0/L -------------------------------------------------------------------------------- -Dependency : github.com/spf13/cobra -Version: v1.3.0 +Dependency : github.com/gofrs/uuid +Version: v4.2.0+incompatible +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/gofrs/uuid@v4.2.0+incompatible/LICENSE: + +Copyright (C) 2013-2018 by Maxim Bublis + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +-------------------------------------------------------------------------------- +Dependency : github.com/magefile/mage +Version: v1.14.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/spf13/cobra@v1.3.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/magefile/mage@v1.14.0/LICENSE: - Apache License + Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ @@ -1276,47 +1276,43 @@ Contents of probable licence file $GOMODCACHE/github.com/spf13/cobra@v1.3.0/LICE incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. + END OF TERMS AND CONDITIONS --------------------------------------------------------------------------------- -Dependency : github.com/stretchr/testify -Version: v1.7.1 -Licence type (autodetected): MIT --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/stretchr/testify@v1.7.1/LICENSE: + APPENDIX: How to apply the Apache License to your work. -MIT License + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. -Copyright (c) 2012-2020 Mat Ryer, Tyler Bunnell and contributors. + Copyright 2017 the Mage authors -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. + http://www.apache.org/licenses/LICENSE-2.0 -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. -------------------------------------------------------------------------------- -Dependency : go.elastic.co/apm -Version: v1.15.0 +Dependency : github.com/spf13/cobra +Version: v1.3.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/go.elastic.co/apm@v1.15.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/spf13/cobra@v1.3.0/LICENSE.txt: - Apache License + Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ @@ -1491,32 +1487,36 @@ Contents of probable licence file $GOMODCACHE/go.elastic.co/apm@v1.15.0/LICENSE: incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. - END OF TERMS AND CONDITIONS - APPENDIX: How to apply the Apache License to your work. +-------------------------------------------------------------------------------- +Dependency : github.com/stretchr/testify +Version: v1.7.1 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. +Contents of probable licence file $GOMODCACHE/github.com/stretchr/testify@v1.7.1/LICENSE: - Copyright 2018 Elasticsearch BV +MIT License - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Copyright (c) 2012-2020 Mat Ryer, Tyler Bunnell and contributors. - http://www.apache.org/licenses/LICENSE-2.0 +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. -------------------------------------------------------------------------------- @@ -21104,217 +21104,6 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-concert@v0.2 limitations under the License. --------------------------------------------------------------------------------- -Dependency : github.com/elastic/go-elasticsearch/v8 -Version: v8.2.0 -Licence type (autodetected): Apache-2.0 --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-elasticsearch/v8@v8.2.0/LICENSE: - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright 2018 Elasticsearch BV - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-libaudit/v2 Version: v2.3.2-0.20220729123722-f8f7d5c19e6b diff --git a/controller/runner.go b/controller/runner.go index 049316b..9515604 100644 --- a/controller/runner.go +++ b/controller/runner.go @@ -207,7 +207,7 @@ func (r *ServerRunner) Close() (err error) { } func outputFromConfig(config output.Config, queue *queue.Queue) (Output, error) { - if config.Elasticsearch != nil { + if config.Elasticsearch != nil && config.Elasticsearch.Enabled { return elasticsearch.NewElasticSearch(config.Elasticsearch, queue), nil } if config.Kafka != nil && config.Kafka.Enabled { diff --git a/go.mod b/go.mod index 15eb7d1..f841409 100644 --- a/go.mod +++ b/go.mod @@ -15,11 +15,11 @@ require ( github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220810153818-dd118efed5a5 github.com/elastic/elastic-agent-client/v7 v7.0.1 github.com/elastic/elastic-agent-shipper-client v0.4.0 + github.com/elastic/go-elasticsearch/v8 v8.2.0 github.com/elastic/go-ucfg v0.8.6 github.com/gofrs/uuid v4.2.0+incompatible github.com/magefile/mage v1.14.0 github.com/stretchr/testify v1.7.1 - go.elastic.co/apm v1.15.0 go.elastic.co/go-licence-detector v0.5.0 golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa golang.org/x/net v0.0.0-20220809184613-07c6da5e1ced @@ -35,6 +35,7 @@ require ( github.com/dustin/go-humanize v1.0.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect + github.com/elastic/elastic-transport-go/v8 v8.1.0 // indirect github.com/elastic/go-licenser v0.4.1 // indirect github.com/elastic/go-structform v0.0.10 // indirect github.com/elastic/go-sysinfo v1.8.1 // indirect @@ -74,7 +75,6 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/xdg/scram v1.0.3 // indirect github.com/xdg/stringprep v1.0.3 // indirect - go.elastic.co/apm/module/apmelasticsearch/v2 v2.0.0 // indirect go.elastic.co/apm/module/apmhttp/v2 v2.0.0 // indirect go.elastic.co/apm/v2 v2.1.0 // indirect go.elastic.co/ecszap v1.0.1 // indirect @@ -82,7 +82,6 @@ require ( go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.22.0 // indirect - golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect diff --git a/go.sum b/go.sum index 386ed0a..58cd720 100644 --- a/go.sum +++ b/go.sum @@ -508,11 +508,13 @@ github.com/elastic/elastic-agent-shipper-client v0.2.0/go.mod h1:OyI2W+Mv3JxlkEF github.com/elastic/elastic-agent-shipper-client v0.4.0 h1:nsTJF9oo4RHLl+zxFUZqNHaE86C6Ba5aImfegcEf6Sk= github.com/elastic/elastic-agent-shipper-client v0.4.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q= github.com/elastic/elastic-agent-system-metrics v0.4.4/go.mod h1:tF/f9Off38nfzTZHIVQ++FkXrDm9keFhFpJ+3pQ00iI= +github.com/elastic/elastic-transport-go/v8 v8.1.0 h1:NeqEz1ty4RQz+TVbUrpSU7pZ48XkzGWQj02k5koahIE= github.com/elastic/elastic-transport-go/v8 v8.1.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= github.com/elastic/go-concert v0.2.0/go.mod h1:HWjpO3IAEJUxOeaJOWXWEp7imKd27foxz9V5vegC/38= +github.com/elastic/go-concert v0.2.0/go.mod h1:HWjpO3IAEJUxOeaJOWXWEp7imKd27foxz9V5vegC/38= +github.com/elastic/go-elasticsearch/v8 v8.2.0 h1:oagGcb1gqxT7yWpQ3E7wMP3NhGRamsKVd7kRdbuI+/Y= github.com/elastic/go-elasticsearch/v8 v8.2.0/go.mod h1:yY52i2Vj0unLz+N3Nwx1gM5LXwoj3h2dgptNGBYkMLA= github.com/elastic/go-libaudit/v2 v2.3.2-0.20220729123722-f8f7d5c19e6b/go.mod h1:+ZE0czqmbqtnRkl0fNgpI+HvVVRo/ZMJdcXv/PaKcOo= -github.com/elastic/go-licenser v0.3.1/go.mod h1:D8eNQk70FOCVBl3smCGQt/lv7meBeQno2eI1S5apiHQ= github.com/elastic/go-licenser v0.4.0/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU= github.com/elastic/go-licenser v0.4.1 h1:1xDURsc8pL5zYT9R29425J3vkHdt4RT5TNEMeRN48x4= github.com/elastic/go-licenser v0.4.1/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU= @@ -523,7 +525,6 @@ github.com/elastic/go-seccomp-bpf v1.2.0/go.mod h1:l+89Vy5BzjVcaX8USZRMOwmwwDScE github.com/elastic/go-structform v0.0.9/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4= github.com/elastic/go-structform v0.0.10 h1:oy08o/Ih2hHTkNcRY/1HhaYvIp5z6t8si8gnCJPDo1w= github.com/elastic/go-structform v0.0.10/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4= -github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-sysinfo v1.8.1 h1:4Yhj+HdV6WjbCRgGdZpPJ8lZQlXZLKDAeIkmQ/VRvi4= github.com/elastic/go-sysinfo v1.8.1/go.mod h1:JfllUnzoQV/JRYymbH3dO1yggI3mV2oTKSXsDHM+uIM= @@ -1539,9 +1540,6 @@ github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX5oPXxHm3bOH+xeAttToC8pqch2ScQN/JoXYupl6xs= github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA= github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg= -go.elastic.co/apm v1.15.0 h1:uPk2g/whK7c7XiZyz/YCUnAUBNPiyNeE3ARX3G6Gx7Q= -go.elastic.co/apm v1.15.0/go.mod h1:dylGv2HKR0tiCV+wliJz1KHtDyuD8SPe69oV7VyK6WY= -go.elastic.co/apm/module/apmelasticsearch/v2 v2.0.0 h1:5UaI4agfuGoGRhpFVb6s63Hj/9xtqYxF7kd6T77tNAw= go.elastic.co/apm/module/apmelasticsearch/v2 v2.0.0/go.mod h1:GmYz+KDp2LDAa2nd/qJ+OkrjWVEoCRIPAWarbUvyt6Y= go.elastic.co/apm/module/apmhttp/v2 v2.0.0 h1:GNfmK1LD4nE5fYqbLxROCpg1ucyjSFG5iwulxwAJ+3o= go.elastic.co/apm/module/apmhttp/v2 v2.0.0/go.mod h1:5KmxcNN7hkJh8sVW3Ggl/pYgnwiNenygE46bZoUb9RE= @@ -1682,7 +1680,6 @@ golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRu golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= -golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug= golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= diff --git a/output/console.go b/output/console.go index 2ba5214..6b5db31 100644 --- a/output/console.go +++ b/output/console.go @@ -36,10 +36,9 @@ func (out *ConsoleOutput) Start() error { // time for the output to shut down. break } - for i := 0; i < batch.Count(); i++ { - if event, ok := batch.Entry(i).(*messages.Event); ok { - out.send(event) - } + events := batch.Events() + for i := 0; i < len(events); i++ { + out.send(events[i]) } // This tells the queue that we're done with these events // and they can be safely discarded. The Beats queue interface @@ -48,7 +47,7 @@ func (out *ConsoleOutput) Start() error { // shipper to track events by their queue IDs so outputs // can report status back to the server; see // https://github.com/elastic/elastic-agent-shipper/issues/27. - batch.Done() + batch.Done(uint64(len(events))) } }() return nil diff --git a/output/elasticsearch/bulk.go b/output/elasticsearch/bulk.go deleted file mode 100644 index dd063cf..0000000 --- a/output/elasticsearch/bulk.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package elasticsearch - -import ( - "bytes" - "errors" - - "github.com/elastic/elastic-agent-libs/logp" -) - -var ( - errExpectedItemsArray = errors.New("expected items array") - errExpectedItemObject = errors.New("expected item response object") - errExpectedStatusCode = errors.New("expected item status code") - errUnexpectedEmptyObject = errors.New("empty object") - errExpectedObjectEnd = errors.New("expected end of object") - - nameItems = []byte("items") - nameStatus = []byte("status") - nameError = []byte("error") -) - -// bulkReadToItems reads the bulk response up to (but not including) items -func bulkReadToItems(reader *jsonReader) error { - if err := reader.ExpectDict(); err != nil { - return errExpectedObject - } - - // find 'items' field in response - for { - kind, name, err := reader.nextFieldName() - if err != nil { - return err - } - - if kind == dictEnd { - return errExpectedItemsArray - } - - // found items array -> continue - if bytes.Equal(name, nameItems) { - break - } - - _, _ = reader.ignoreNext() - } - - // check items field is an array - if err := reader.ExpectArray(); err != nil { - return errExpectedItemsArray - } - - return nil -} - -// bulkReadItemStatus reads the status and error fields from the bulk item -func bulkReadItemStatus(logger *logp.Logger, reader *jsonReader) (int, []byte, error) { - // skip outer dictionary - if err := reader.ExpectDict(); err != nil { - return 0, nil, errExpectedItemObject - } - - // find first field in outer dictionary (e.g. 'create') - kind, _, err := reader.nextFieldName() - if err != nil { - logger.Errorf("Failed to parse bulk response item: %s", err) - return 0, nil, err - } - if kind == dictEnd { - err = errUnexpectedEmptyObject - logger.Errorf("Failed to parse bulk response item: %s", err) - return 0, nil, err - } - - // parse actual item response code and error message - status, msg, err := itemStatusInner(reader, logger) - if err != nil { - logger.Errorf("Failed to parse bulk response item: %s", err) - return 0, nil, err - } - - // close dictionary. Expect outer dictionary to have only one element - kind, _, err = reader.step() - if err != nil { - logger.Errorf("Failed to parse bulk response item: %s", err) - return 0, nil, err - } - if kind != dictEnd { - err = errExpectedObjectEnd - logger.Errorf("Failed to parse bulk response item: %s", err) - return 0, nil, err - } - - return status, msg, nil -} - -func itemStatusInner(reader *jsonReader, logger *logp.Logger) (int, []byte, error) { - if err := reader.ExpectDict(); err != nil { - return 0, nil, errExpectedItemObject - } - - status := -1 - var msg []byte - for { - kind, name, err := reader.nextFieldName() - if err != nil { - logger.Errorf("Failed to parse bulk response item: %s", err) - } - if kind == dictEnd { - break - } - - switch { - case bytes.Equal(name, nameStatus): // name == "status" - status, err = reader.nextInt() - if err != nil { - logger.Errorf("Failed to parse bulk response item: %s", err) - return 0, nil, err - } - - case bytes.Equal(name, nameError): // name == "error" - msg, err = reader.ignoreNext() // collect raw string for "error" field - if err != nil { - return 0, nil, err - } - - default: // ignore unknown fields - _, err = reader.ignoreNext() - if err != nil { - return 0, nil, err - } - } - } - - if status < 0 { - return 0, nil, errExpectedStatusCode - } - return status, msg, nil -} diff --git a/output/elasticsearch/callbacks.go b/output/elasticsearch/callbacks.go deleted file mode 100644 index f5f13d4..0000000 --- a/output/elasticsearch/callbacks.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package elasticsearch - -import ( - "sync" - - "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" - - "github.com/gofrs/uuid" -) - -// ConnectCallback defines the type for the function to be called when the Elasticsearch client successfully connects to the cluster -type ConnectCallback func(*eslegclient.Connection) error - -// Callbacks must not depend on the result of a previous one, -// because the ordering is not fixed. -type callbacksRegistry struct { - callbacks map[uuid.UUID]ConnectCallback - mutex sync.Mutex -} - -// XXX: it would be fantastic to do this without a package global -var connectCallbackRegistry = newCallbacksRegistry() - -// NOTE(ph): We need to refactor this, right now this is the only way to ensure that every calls -// to an ES cluster executes a callback. -var globalCallbackRegistry = newCallbacksRegistry() - -func newCallbacksRegistry() callbacksRegistry { - return callbacksRegistry{ - callbacks: make(map[uuid.UUID]ConnectCallback), - } -} - -// RegisterGlobalCallback register a global callbacks. -func RegisterGlobalCallback(callback ConnectCallback) (uuid.UUID, error) { - globalCallbackRegistry.mutex.Lock() - defer globalCallbackRegistry.mutex.Unlock() - - // find the next unique key - var key uuid.UUID - var err error - exists := true - for exists { - key, err = uuid.NewV4() - if err != nil { - return uuid.Nil, err - } - _, exists = globalCallbackRegistry.callbacks[key] - } - - globalCallbackRegistry.callbacks[key] = callback - return key, nil -} - -// RegisterConnectCallback registers a callback for the elasticsearch output -// The callback is called each time the client connects to elasticsearch. -// It returns the key of the newly added callback, so it can be deregistered later. -func RegisterConnectCallback(callback ConnectCallback) (uuid.UUID, error) { - connectCallbackRegistry.mutex.Lock() - defer connectCallbackRegistry.mutex.Unlock() - - // find the next unique key - var key uuid.UUID - var err error - exists := true - for exists { - key, err = uuid.NewV4() - if err != nil { - return uuid.Nil, err - } - _, exists = connectCallbackRegistry.callbacks[key] - } - - connectCallbackRegistry.callbacks[key] = callback - return key, nil -} - -// DeregisterGlobalCallback deregisters a callback for the elasticsearch output -// specified by its key. If a callback does not exist, nothing happens. -func DeregisterGlobalCallback(key uuid.UUID) { - globalCallbackRegistry.mutex.Lock() - defer globalCallbackRegistry.mutex.Unlock() - - delete(globalCallbackRegistry.callbacks, key) -} - -// DeregisterConnectCallback deregisters a callback for the elasticsearch output -// specified by its key. If a callback does not exist, nothing happens. -func DeregisterConnectCallback(key uuid.UUID) { - connectCallbackRegistry.mutex.Lock() - defer connectCallbackRegistry.mutex.Unlock() - - delete(connectCallbackRegistry.callbacks, key) -} diff --git a/output/elasticsearch/client.go b/output/elasticsearch/client.go deleted file mode 100644 index 4eef166..0000000 --- a/output/elasticsearch/client.go +++ /dev/null @@ -1,417 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package elasticsearch - -import ( - "context" - "errors" - "fmt" - "net/http" - "time" - - "go.elastic.co/apm" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/beat/events" - "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" - "github.com/elastic/beats/v7/libbeat/outputs" - "github.com/elastic/beats/v7/libbeat/outputs/outil" - "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-libs/testing" - "github.com/elastic/elastic-agent-libs/version" - - "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" -) - -var ( - errPayloadTooLarge = errors.New("the bulk payload is too large for the server. Consider to adjust `http.max_content_length` parameter in Elasticsearch or `bulk_max_size` in the beat. The batch has been dropped") - - //nolint:stylecheck // Elasticsearch should be capitalized - ErrTooOld = errors.New("Elasticsearch is too old. Please upgrade the instance. If you would like to connect to older instances set output.elasticsearch.allow_older_versions to true.") -) - -// Client is an elasticsearch client. -type Client struct { - conn eslegclient.Connection - - //index outputs.IndexSelector - //pipeline *outil.Selector - - //observer outputs.Observer - - log *logp.Logger -} - -// ClientSettings contains the settings for a client. -type ClientSettings struct { - eslegclient.ConnectionSettings - Index outputs.IndexSelector - Pipeline *outil.Selector - Observer outputs.Observer - NonIndexableAction string -} - -type bulkResultStats struct { - acked int // number of events ACKed by Elasticsearch - duplicates int // number of events failed with `create` due to ID already being indexed - fails int // number of failed events (can be retried) - nonIndexable int // number of failed events (not indexable) - tooMany int // number of events receiving HTTP 429 Too Many Requests -} - -// NewClient instantiates a new client. -func NewClient( - s ClientSettings, - onConnect *callbacksRegistry, -) (*Client, error) { - conn, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ - URL: s.URL, - Beatname: s.Beatname, - Username: s.Username, - Password: s.Password, - APIKey: s.APIKey, - Headers: s.Headers, - Kerberos: s.Kerberos, - Observer: s.Observer, - Parameters: s.Parameters, - CompressionLevel: s.CompressionLevel, - EscapeHTML: s.EscapeHTML, - Transport: s.Transport, - }) - if err != nil { - return nil, err - } - - conn.OnConnectCallback = func() error { - globalCallbackRegistry.mutex.Lock() - defer globalCallbackRegistry.mutex.Unlock() - - for _, callback := range globalCallbackRegistry.callbacks { - err := callback(conn) - if err != nil { - return err - } - } - - if onConnect != nil { - onConnect.mutex.Lock() - defer onConnect.mutex.Unlock() - - for _, callback := range onConnect.callbacks { - err := callback(conn) - if err != nil { - return err - } - } - } - return nil - } - - client := &Client{ - conn: *conn, - //index: s.Index, - //pipeline: pipeline, - //observer: s.Observer, - - log: logp.NewLogger("elasticsearch"), - } - - return client, nil -} - -/*type PublishResult struct { - retry []*messages.Event - drop []*messages.Event - err error -} -*/ - -/*func (client *Client) Publish(ctx context.Context, events []*messages.Event) error { - rest, err := client.publishEvents(ctx, events) - - switch { - case err == errPayloadTooLarge: - // report fatal error - case len(rest) == 0: - batch.ACK() - default: - batch.RetryEvents(rest) - } - return err -}*/ - -func mapstrForValue(v *messages.Value) interface{} { - if boolVal, ok := v.GetKind().(*messages.Value_BoolValue); ok { - return boolVal.BoolValue - } - if listVal, ok := v.GetKind().(*messages.Value_ListValue); ok { - return mapstrForList(listVal.ListValue) - } - if nullVal, ok := v.GetKind().(*messages.Value_NullValue); ok { - return nullVal.NullValue - } - if intVal, ok := v.GetKind().(*messages.Value_NumberValue); ok { - return intVal.NumberValue - } - if strVal, ok := v.GetKind().(*messages.Value_StringValue); ok { - return strVal.StringValue - } - if structVal, ok := v.GetKind().(*messages.Value_StructValue); ok { - return mapstrForStruct(structVal.StructValue) - } - if tsVal, ok := v.GetKind().(*messages.Value_TimestampValue); ok { - return tsVal.TimestampValue.AsTime() - } - return nil -} - -func mapstrForList(list *messages.ListValue) []interface{} { - results := []interface{}{} - for _, val := range list.Values { - results = append(results, mapstrForValue(val)) - } - return results -} - -func mapstrForStruct(proto *messages.Struct) mapstr.M { - data := proto.GetData() - result := mapstr.M{} - for key, value := range data { - result[key] = mapstrForValue(value) - } - return result -} - -func beatsEventForProto(e *messages.Event) *beat.Event { - return &beat.Event{ - Timestamp: e.GetTimestamp().AsTime(), - Meta: mapstrForStruct(e.GetMetadata()), - Fields: mapstrForStruct(e.GetFields()), - } -} - -// PublishEvents sends all events to elasticsearch. On error a slice with all -// events not published or confirmed to be processed by elasticsearch will be -// returned. The input slice backing memory will be reused by return the value. -func (client *Client) publishEvents(ctx context.Context, data []*messages.Event) ([]*messages.Event, error) { - span, ctx := apm.StartSpan(ctx, "publishEvents", "output") - defer span.End() - begin := time.Now() - - /*st := client.observer - - if st != nil { - st.NewBatch(len(data)) - }*/ - - if len(data) == 0 { - return nil, nil - } - - // encode events into bulk request buffer, dropping failed elements from - // events slice - origCount := len(data) - span.Context.SetLabel("events_original", origCount) - - data, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), data) - newCount := len(data) - - span.Context.SetLabel("events_encoded", newCount) - /*if st != nil && origCount > newCount { - st.Dropped(origCount - newCount) - }*/ - if newCount == 0 { - return nil, nil - } - - status, result, sendErr := client.conn.Bulk(ctx, "", "", nil, bulkItems) - - if sendErr != nil { - if status == http.StatusRequestEntityTooLarge { - sendErr = errPayloadTooLarge - } - err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", sendErr)) - err.Send() - client.log.Error(err) - return data, sendErr - } - pubCount := len(data) - span.Context.SetLabel("events_published", pubCount) - - client.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.", - pubCount, - time.Since(begin)) - - // check response for transient errors - var failedEvents []*messages.Event - var stats bulkResultStats - if status != 200 { - failedEvents = data - stats.fails = len(failedEvents) - } else { - failedEvents, _ = client.bulkCollectPublishFails(result, data) - } - - failed := len(failedEvents) - span.Context.SetLabel("events_failed", failed) - /*if st := client.observer; st != nil { - dropped := stats.nonIndexable - duplicates := stats.duplicates - acked := len(data) - failed - dropped - duplicates - - st.Acked(acked) - st.Failed(failed) - st.Dropped(dropped) - st.Duplicate(duplicates) - st.ErrTooMany(stats.tooMany) - }*/ - - if failed > 0 { - if sendErr == nil { - sendErr = eslegclient.ErrTempBulkFailure - } - return failedEvents, sendErr - } - return nil, nil -} - -// bulkEncodePublishRequest encodes all bulk requests and returns slice of events -// successfully added to the list of bulk items and the list of bulk items. -func (client *Client) bulkEncodePublishRequest(version version.V, data []*messages.Event) ([]*messages.Event, []interface{}) { - okEvents := make([]*messages.Event, len(data)) - bulkItems := []interface{}{} - for _, event := range data { - meta, err := client.createEventBulkMeta(version, event) - if err != nil { - client.log.Errorf("Failed to encode event meta data: %+v", err) - continue - } - bulkItems = append(bulkItems, meta, beatsEventForProto(event)) - okEvents = append(okEvents, event) - } - return okEvents, bulkItems -} - -func GetOpType(e *messages.Event) events.OpType { - opStr, err := getMetaStringValue(e, events.FieldMetaOpType) - if err != nil { - return events.OpTypeDefault - } - - switch opStr { - case "create": - return events.OpTypeCreate - case "index": - return events.OpTypeIndex - case "delete": - return events.OpTypeDelete - } - - return events.OpTypeDefault -} - -func (client *Client) createEventBulkMeta(version version.V, event *messages.Event) (interface{}, error) { - eventType := "" - /*if version.Major < 7 { - eventType = defaultEventType - }*/ - - pipeline := "" - /*pipeline, err := client.getPipeline(event) - if err != nil { - err := fmt.Errorf("failed to select pipeline: %v", err) - return nil, err - }*/ - - index := "elastic-agent-shipper" - /*index, err := client.index.Select(event) - if err != nil { - err := fmt.Errorf("failed to select event index: %v", err) - return nil, err - }*/ - - id, _ := getMetaStringValue(event, events.FieldMetaID) - opType := GetOpType(event) - - meta := eslegclient.BulkMeta{ - Index: index, - DocType: eventType, - Pipeline: pipeline, - ID: id, - } - - if opType == events.OpTypeIndex { - return eslegclient.BulkIndexAction{Index: meta}, nil - } - return eslegclient.BulkCreateAction{Create: meta}, nil -} - -// bulkCollectPublishFails checks per item errors returning all events -// to be tried again due to error code returned for that items. If indexing an -// event failed due to some error in the event itself (e.g. does not respect mapping), -// the event will be dropped. -func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, data []*messages.Event) ([]*messages.Event, bulkResultStats) { - reader := newJSONReader(result) - if err := bulkReadToItems(reader); err != nil { - client.log.Errorf("failed to parse bulk response: %v", err.Error()) - return nil, bulkResultStats{} - } - - count := len(data) - failed := data[:0] - stats := bulkResultStats{} - for i := 0; i < count; i++ { - status, msg, err := bulkReadItemStatus(client.log, reader) - if err != nil { - client.log.Error(err) - return nil, bulkResultStats{} - } - - if status < 300 { - stats.acked++ - continue // ok value - } - - if status == 409 { - // 409 is used to indicate an event with same ID already exists if - // `create` op_type is used. - stats.duplicates++ - continue // ok - } - - if status < 500 { - if status == http.StatusTooManyRequests { - stats.tooMany++ - } else { - stats.nonIndexable++ - client.log.Warnf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg) - continue - } - } - - client.log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) - stats.fails++ - failed = append(failed, data[i]) - } - - return failed, stats -} - -func (client *Client) Connect() error { - return client.conn.Connect() -} - -func (client *Client) Close() error { - return client.conn.Close() -} - -func (client *Client) String() string { - return "elasticsearch(" + client.conn.URL + ")" -} - -func (client *Client) Test(d testing.Driver) { - client.conn.Test(d) -} diff --git a/output/elasticsearch/config.go b/output/elasticsearch/config.go index b4eb1d8..4d5073e 100644 --- a/output/elasticsearch/config.go +++ b/output/elasticsearch/config.go @@ -5,12 +5,16 @@ package elasticsearch import ( + "crypto/tls" "fmt" + "net/http" "time" "github.com/elastic/beats/v7/libbeat/common/transport/kerberos" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/transport/httpcommon" + "github.com/elastic/elastic-agent-libs/transport/tlscommon" + "github.com/elastic/go-elasticsearch/v8" ) // Config specifies all configurable parameters for the Elasticsearch output. @@ -44,31 +48,6 @@ type Backoff struct { Max time.Duration } -/*const ( - defaultBulkSize = 50 -)*/ - -/*var ( - defaultConfig = Config{ - Protocol: "", - Path: "", - Params: nil, - Username: "", - Password: "", - APIKey: "", - MaxRetries: 3, - CompressionLevel: 0, - EscapeHTML: false, - Kerberos: nil, - LoadBalance: true, - Backoff: Backoff{ - Init: 1 * time.Second, - Max: 60 * time.Second, - }, - Transport: httpcommon.DefaultHTTPTransportSettings(), - } -)*/ - func (c *Config) Validate() error { if c.APIKey != "" && (c.Username != "" || c.Password != "") { return fmt.Errorf("cannot set both api_key and username/password") @@ -76,3 +55,24 @@ func (c *Config) Validate() error { return nil } + +// esConfig converts the configuration for the elasticsearch shipper output +// to the configuration for the go-elasticsearch client API. +func (c Config) esConfig() elasticsearch.Config { + tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12} + if c.Transport.TLS.VerificationMode == tlscommon.VerifyNone { + // Unlike Beats, the shipper doesn't support the ability to verify the + // certificate but not the hostname, so any setting except VerifyNone + // falls back on full verification. + tlsConfig.InsecureSkipVerify = true + } + cfg := elasticsearch.Config{ + Addresses: c.Hosts, + Username: c.Username, + Password: c.Password, + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + } + return cfg +} diff --git a/output/elasticsearch/json_read.go b/output/elasticsearch/json_read.go deleted file mode 100644 index 67ccbac..0000000 --- a/output/elasticsearch/json_read.go +++ /dev/null @@ -1,573 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package elasticsearch - -import ( - "errors" - - "github.com/elastic/beats/v7/libbeat/common/streambuf" -) - -// SAX like json parser. But instead of relying on callbacks, state machine -// returns raw item plus entity. On top of state machine additional helper methods -// like ExpectDict, ExpectArray, nextFieldName and nextInt are available for -// low-level parsing/stepping through a json document. -// -// Due to parser simply stepping through the input buffer, almost no additional -// allocations are required. -type jsonReader struct { - streambuf.Buffer - - // parser state machine - states []state // state stack for nested arrays/objects - currentState state - - // preallocate stack memory for up to 32 nested arrays/objects - statesBuf [32]state -} - -var ( - errFailing = errors.New("JSON parser failed") - errUnknownChar = errors.New("unknown character") - errQuoteMissing = errors.New("missing closing quote") - errExpectColon = errors.New("expected ':' after map key") - errUnexpectedDictClose = errors.New("unexpected '}'") - errUnexpectedArrClose = errors.New("unexpected ']'") - errExpectedDigit = errors.New("expected a digit") - errExpectedObject = errors.New("expected JSON object") - errExpectedArray = errors.New("expected JSON array") - errExpectedFieldName = errors.New("expected JSON object field name") - errExpectedInteger = errors.New("expected integer value") - errExpectedNull = errors.New("expected null value") - errExpectedFalse = errors.New("expected false value") - errExpectedTrue = errors.New("expected true value") - errExpectedArrayField = errors.New("expected ']' or ','") -) - -var ( - nullSymbol = []byte("null") - trueSymbol = []byte("true") - falseSymbol = []byte("false") -) - -type entity uint8 - -const ( - failEntity entity = iota - trueValue - falseValue - nullValue - dictStart - dictEnd - arrStart - arrEnd - stringEntity - mapKeyEntity - intEntity - doubleEntity -) - -type state uint8 - -const ( - failedState state = iota - startState - arrState - arrStateNext - dictState - dictFieldState - dictFieldStateEnd -) - -var entityNames = map[entity]string{ - failEntity: "failEntity", - trueValue: "trueValue", - falseValue: "falseValue", - nullValue: "nullValue", - dictStart: "dictStart", - dictEnd: "dictEnd", - arrStart: "arrStart", - arrEnd: "arrEnd", - stringEntity: "stringEntity", - mapKeyEntity: "mapKeyEntity", - intEntity: "intEntity", - doubleEntity: "doubleEntity", -} - -var stateNames = map[state]string{ - failedState: "failed", - startState: "start", - arrState: "array", - arrStateNext: "arrayNext", - dictState: "dict", - dictFieldState: "dictValue", - dictFieldStateEnd: "dictNext", -} - -func (e entity) String() string { - if name, ok := entityNames[e]; ok { - return name - } - return "unknown" -} - -func (s state) String() string { - if name, ok := stateNames[s]; ok { - return name - } - return "unknown" -} - -func newJSONReader(in []byte) *jsonReader { - r := &jsonReader{} - r.init(in) - return r -} - -func (r *jsonReader) init(in []byte) { - r.Buffer.Init(in, true) - r.currentState = startState - r.states = r.statesBuf[:0] -} - -var whitespace = []byte(" \t\r\n") - -func (r *jsonReader) skipWS() { - _ = r.IgnoreSymbols(whitespace) -} - -func (r *jsonReader) pushState(next state) { - if r.currentState != failedState { - r.states = append(r.states, r.currentState) - } - r.currentState = next -} - -func (r *jsonReader) popState() { - if len(r.states) == 0 { - r.currentState = failedState - } else { - last := len(r.states) - 1 - r.currentState = r.states[last] - r.states = r.states[:last] - } -} - -// ExpectDict checks if the next entity is a json object -func (r *jsonReader) ExpectDict() error { - e, _, err := r.step() - - if err != nil { - return err - } - - if e != dictStart { - return r.SetError(errExpectedObject) - } - - return nil -} - -// ExpectArray checks if the next entity is a json array -func (r *jsonReader) ExpectArray() error { - e, _, err := r.step() - if err != nil { - return err - } - - if e != arrStart { - return r.SetError(errExpectedArray) - } - - return nil -} - -func (r *jsonReader) nextFieldName() (entity, []byte, error) { - e, raw, err := r.step() - if err != nil { - return e, raw, err - } - - if e != mapKeyEntity && e != dictEnd { - return e, nil, r.SetError(errExpectedFieldName) - } - - return e, raw, err -} - -func (r *jsonReader) nextInt() (int, error) { - e, raw, err := r.step() - if err != nil { - return 0, err - } - - if e != intEntity { - return 0, errExpectedInteger - } - - tmp := streambuf.NewFixed(raw) - i, err := tmp.IntASCII(false) - return int(i), err -} - -// ignore type of next element and return raw content. -func (r *jsonReader) ignoreNext() (raw []byte, err error) { - r.skipWS() - - snapshot := r.Snapshot() - before := r.Len() - - e, _, err := r.step() - if err != nil { - return nil, err - } - - switch e { - case arrStart: - err = ignoreKind(r, arrEnd) - case dictStart: - err = ignoreKind(r, dictEnd) - default: - } - if err != nil { - return nil, err - } - - after := r.Len() - r.Restore(snapshot) - - bytes, _ := r.Collect(before - after) - return bytes, nil -} - -func ignoreKind(r *jsonReader, kind entity) error { - for { - e, _, err := r.step() - if err != nil { - return err - } - - switch e { - case kind: - return nil - case arrStart: - if err := ignoreKind(r, arrEnd); err != nil { - return err - } - case dictStart: - if err := ignoreKind(r, dictEnd); err != nil { - return err - } - } - } -} - -// step continues the JSON parser state machine until next entity has been parsed. -func (r *jsonReader) step() (entity, []byte, error) { - r.skipWS() - switch r.currentState { - case failedState: - return r.stepFailing() - case startState: - return r.stepStart() - case arrState: - return r.stepArray() - case arrStateNext: - return r.stepArrayNext() - case dictState: - return r.stepDict() - case dictFieldState: - return r.stepDictValue() - case dictFieldStateEnd: - return r.stepDictValueEnd() - default: - return r.failWith(errFailing) - } -} - -func (r *jsonReader) stepFailing() (entity, []byte, error) { - return failEntity, nil, r.Err() -} - -func (r *jsonReader) stepStart() (entity, []byte, error) { - c, err := r.PeekByte() - if err != nil { - return r.failWith(err) - } - - return r.tryStepPrimitive(c) -} - -func (r *jsonReader) stepArray() (entity, []byte, error) { - return r.doStepArray(true) -} - -func (r *jsonReader) stepArrayNext() (entity, []byte, error) { - c, err := r.PeekByte() - if err != nil { - return r.failWith(errFailing) - } - - switch c { - case ']': - return r.endArray() - case ',': - _ = r.Advance(1) - r.skipWS() - r.currentState = arrState - return r.doStepArray(false) - default: - return r.failWith(errExpectedArrayField) - } -} - -func (r *jsonReader) doStepArray(allowArrayEnd bool) (entity, []byte, error) { - c, err := r.PeekByte() - if err != nil { - return r.failWith(err) - } - - if c == ']' { - if !allowArrayEnd { - return r.failWith(errUnexpectedArrClose) - } - return r.endArray() - } - - r.currentState = arrStateNext - return r.tryStepPrimitive(c) -} - -func (r *jsonReader) stepDict() (entity, []byte, error) { - return r.doStepDict(true) -} - -func (r *jsonReader) doStepDict(allowEnd bool) (entity, []byte, error) { - c, err := r.PeekByte() - if err != nil { - return r.failWith(err) - } - - switch c { - case '}': - if !allowEnd { - return r.failWith(errUnexpectedDictClose) - } - return r.endDict() - case '"': - r.currentState = dictFieldState - return r.stepMapKey() - default: - return r.failWith(errExpectedFieldName) - } -} - -func (r *jsonReader) stepDictValue() (entity, []byte, error) { - c, err := r.PeekByte() - if err != nil { - return r.failWith(err) - } - - r.currentState = dictFieldStateEnd - return r.tryStepPrimitive(c) -} - -func (r *jsonReader) stepDictValueEnd() (entity, []byte, error) { - c, err := r.PeekByte() - if err != nil { - return r.failWith(err) - } - - switch c { - case '}': - return r.endDict() - case ',': - _ = r.Advance(1) - r.skipWS() - r.currentState = dictState - return r.doStepDict(false) - default: - return r.failWith(errUnknownChar) - } -} - -func (r *jsonReader) tryStepPrimitive(c byte) (entity, []byte, error) { - switch c { - case '{': // start dictionary - return r.startDict() - case '[': // start array - return r.startArray() - case 'n': // null - return r.stepNull() - case 'f': // false - return r.stepFalse() - case 't': // true - return r.stepTrue() - case '"': - return r.stepString() - default: - // parse number? - if c == '-' || c == '+' || c == '.' || ('0' <= c && c <= '9') { - return r.stepNumber() - } - - err := r.Err() - if err == nil { - err = r.SetError(errUnknownChar) - } - return r.failWith(err) - } -} - -func (r *jsonReader) stepNull() (entity, []byte, error) { - return stepSymbol(r, nullValue, nullSymbol, errExpectedNull) -} - -func (r *jsonReader) stepTrue() (entity, []byte, error) { - return stepSymbol(r, trueValue, trueSymbol, errExpectedTrue) -} - -func (r *jsonReader) stepFalse() (entity, []byte, error) { - return stepSymbol(r, falseValue, falseSymbol, errExpectedFalse) -} - -func stepSymbol(r *jsonReader, e entity, symb []byte, fail error) (entity, []byte, error) { - ok, err := r.MatchASCII(symb) - if err != nil { - return failEntity, nil, err - } - if !ok { - return failEntity, nil, fail - } - - _ = r.Advance(len(symb)) - return e, nil, nil -} - -func (r *jsonReader) stepMapKey() (entity, []byte, error) { - e, key, err := r.stepString() - if err != nil { - return e, key, err - } - - r.skipWS() - c, err := r.ReadByte() - if err != nil { - return failEntity, nil, err - } - - if c != ':' { - return r.failWith(r.SetError(errExpectColon)) - } - - if err := r.Err(); err != nil { - return r.failWith(err) - } - return mapKeyEntity, key, nil -} - -func (r *jsonReader) stepString() (entity, []byte, error) { - start := 1 - for { - idxQuote := r.IndexByteFrom(start, '"') - if idxQuote == -1 { - return failEntity, nil, r.SetError(errQuoteMissing) - } - - if b, _ := r.PeekByteFrom(idxQuote - 1); b == '\\' { // escaped quote? - start = idxQuote + 1 - continue - } - - // found string end - str, err := r.Collect(idxQuote + 1) - str = str[1 : len(str)-1] - return stringEntity, str, err - } -} - -func (r *jsonReader) startDict() (entity, []byte, error) { - _ = r.Advance(1) - r.pushState(dictState) - return dictStart, nil, nil -} - -func (r *jsonReader) endDict() (entity, []byte, error) { - _ = r.Advance(1) - r.popState() - return dictEnd, nil, nil -} - -func (r *jsonReader) startArray() (entity, []byte, error) { - _ = r.Advance(1) - r.pushState(arrState) - return arrStart, nil, nil -} - -func (r *jsonReader) endArray() (entity, []byte, error) { - _ = r.Advance(1) - r.popState() - return arrEnd, nil, nil -} - -func (r *jsonReader) failWith(err error) (entity, []byte, error) { - r.currentState = failedState - return failEntity, nil, r.SetError(err) -} - -func (r *jsonReader) stepNumber() (entity, []byte, error) { - snapshot := r.Snapshot() - lenBefore := r.Len() - isDouble := false - - if err := r.Err(); err != nil { - return failEntity, nil, err - } - - // parse '+', '-' or '.' - if b, _ := r.PeekByte(); b == '-' || b == '+' { - _ = r.Advance(1) - } - if b, _ := r.PeekByte(); b == '.' { - _ = r.Advance(1) - isDouble = true - } - - // parse digits - buf, _ := r.CollectWhile(isDigit) - if len(buf) == 0 { - return failEntity, nil, r.SetError(errExpectedDigit) - } - - if !isDouble { - // parse optional '.' - if b, _ := r.PeekByte(); b == '.' { - _ = r.Advance(1) - isDouble = true - - // parse optional digits - _, _ = r.CollectWhile(isDigit) - } - } - - lenAfter := r.Len() - r.Restore(snapshot) - total := lenBefore - lenAfter - 1 - if total == 0 { - return failEntity, nil, r.SetError(errExpectedDigit) - } - - raw, _ := r.Collect(total) - state := intEntity - if isDouble { - state = doubleEntity - } - - return state, raw, nil -} - -func isDigit(c byte) bool { - return '0' <= c && c <= '9' -} diff --git a/output/elasticsearch/output.go b/output/elasticsearch/output.go index 98f518c..df4ca2c 100644 --- a/output/elasticsearch/output.go +++ b/output/elasticsearch/output.go @@ -5,15 +5,17 @@ package elasticsearch import ( + "bytes" "context" - "fmt" + "encoding/json" "sync" + "time" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" "github.com/elastic/elastic-agent-shipper/queue" + "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esutil" ) type ElasticSearchOutput struct { @@ -22,6 +24,9 @@ type ElasticSearchOutput struct { queue *queue.Queue + client *elasticsearch.Client + bulkIndexer esutil.BulkIndexer + wg sync.WaitGroup } @@ -35,17 +40,36 @@ func NewElasticSearch(config *Config, queue *queue.Queue) *ElasticSearchOutput { return out } -func (out *ElasticSearchOutput) Start() error { - client, err := makeES(*out.config) +func serializeEvent(event *messages.Event) ([]byte, error) { + // TODO: we need to preprocessing the raw protobuf to get fields in the + // right place for ECS. This just translates the protobuf structure + // directly to json. + return json.Marshal(event) +} + +func (es *ElasticSearchOutput) Start() error { + client, err := elasticsearch.NewClient(es.config.esConfig()) if err != nil { return err } + bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Index: "elastic-agent-shipper-test", + Client: client, + NumWorkers: 1, + FlushBytes: 1e+8, // 20MB + FlushInterval: 30 * time.Second, + }) + if err != nil { + return err + } + es.client = client + es.bulkIndexer = bi - out.wg.Add(1) + es.wg.Add(1) go func() { - defer out.wg.Done() + defer es.wg.Done() for { - batch, err := out.queue.Get(1000) + batch, err := es.queue.Get(1000) // Once an output receives a batch, it is responsible for // it until all events have been either successfully sent or // discarded after failure. @@ -54,122 +78,53 @@ func (out *ElasticSearchOutput) Start() error { // time for the output to shut down. break } - count := batch.Count() - events := make([]*messages.Event, count) - for i := 0; i < batch.Count(); i++ { - events[i], _ = batch.Entry(i).(*messages.Event) - } - for len(events) > 0 { - events, _ = client.publishEvents(context.TODO(), events) - // TODO: error handling / retry backoff + // Add this batch to the shutdown wait group and release it + // in the batch's completion callback + es.wg.Add(1) + batch.CompletionCallback = es.wg.Done + + events := batch.Events() + for _, event := range events { + serialized, err := serializeEvent(event) + if err != nil { + es.logger.Errorf("failed to serialize event: %v", err) + continue + } + err = bi.Add( + context.Background(), + esutil.BulkIndexerItem{ + Action: "index", + Body: bytes.NewReader(serialized), + OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { + // TODO: update metrics + batch.Done(1) + }, + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + // TODO: update metrics + batch.Done(1) + }, + }, + ) + if err != nil { + es.logger.Errorf("couldn't add to bulk index request: %v", err) + // This event couldn't be attempted, so mark it as finished. + batch.Done(1) + } } - // This tells the queue that we're done with these events - // and they can be safely discarded. The Beats queue interface - // doesn't provide a way to indicate failure, of either the - // full batch or individual events. The plan is for the - // shipper to track events by their queue IDs so outputs - // can report status back to the server; see - // https://github.com/elastic/elastic-agent-shipper/issues/27. - batch.Done() + } + + // Close the bulk indexer + if err := bi.Close(context.Background()); err != nil { + es.logger.Errorf("error closing bulk indexer: %s", err) } }() return nil } -// Wait until the output loop has finished. This doesn't stop the -// loop by itself, so make sure you only call it when you close -// the queue. -func (out *ElasticSearchOutput) Wait() { - out.wg.Wait() -} - -func makeES( - /*im outputs.IndexManager, - beat beat.Info, - observer outputs.Observer,*/ - config Config, -) (*Client, error) { - log := logp.NewLogger("elasticsearch") - /*if !cfg.HasField("bulk_max_size") { - cfg.SetInt("bulk_max_size", -1, defaultBulkSize) - }*/ - - /*hosts, err := outputs.ReadHostList(cfg) - if err != nil { - return outputs.Fail(err) - }*/ - - if proxyURL := config.Transport.Proxy.URL; proxyURL != nil && !config.Transport.Proxy.Disable { - log.Debugf("breaking down proxy URL. Scheme: '%s', host[:port]: '%s', path: '%s'", proxyURL.Scheme, proxyURL.Host, proxyURL.Path) - log.Infof("Using proxy URL: %s", proxyURL) - } - - params := config.Params - if len(params) == 0 { - params = nil - } - - if len(config.Hosts) == 0 { - return nil, fmt.Errorf("hosts list cannot be empty") - } - host := config.Hosts[0] - esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) - if err != nil { - log.Errorf("Invalid host param set: %s, Error: %+v", host, err) - return nil, err - } - - return NewClient(ClientSettings{ - ConnectionSettings: eslegclient.ConnectionSettings{ - URL: esURL, - Beatname: "elastic-agent-shipper", - Kerberos: config.Kerberos, - Username: config.Username, - Password: config.Password, - APIKey: config.APIKey, - Parameters: params, - Headers: config.Headers, - CompressionLevel: config.CompressionLevel, - // TODO: No observer yet, is leaving it nil ok? - EscapeHTML: config.EscapeHTML, - Transport: config.Transport, - }, - }, &connectCallbackRegistry) - /*clients := make([]*Client, len(hosts)) - for i, host := range hosts { - esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) - if err != nil { - log.Errorf("Invalid host param set: %s, Error: %+v", host, err) - return nil, err - } - - //var client outputs.NetworkClient - client, err := NewClient(ClientSettings{ - ConnectionSettings: eslegclient.ConnectionSettings{ - URL: esURL, - Beatname: beat.Beat, - Kerberos: config.Kerberos, - Username: config.Username, - Password: config.Password, - APIKey: config.APIKey, - Parameters: params, - Headers: config.Headers, - CompressionLevel: config.CompressionLevel, - Observer: observer, - EscapeHTML: config.EscapeHTML, - Transport: config.Transport, - }, - Index: index, - Pipeline: pipeline, - Observer: observer, - }, &connectCallbackRegistry) - if err != nil { - return nil, err - } - - clients[i] = client - } - - return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients)*/ +// Wait until the output loop has finished and pending events have concluded in +// success or failure. This doesn't stop the output loop by itself, so make sure +// you only call it when the queue is closed. +func (es *ElasticSearchOutput) Wait() { + es.wg.Wait() } diff --git a/output/elasticsearch/util.go b/output/elasticsearch/util.go deleted file mode 100644 index 0c9f7ef..0000000 --- a/output/elasticsearch/util.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package elasticsearch - -import ( - "fmt" - - "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" -) - -func getMetaStringValue(e *messages.Event, key string) (string, error) { - meta := e.GetMetadata() - metaMap := meta.GetData() - value, ok := metaMap[key] - if !ok { - return "", fmt.Errorf("field not found") - } - return value.GetStringValue(), nil -} diff --git a/output/kafka/output.go b/output/kafka/output.go index 13daa6f..6e43ed3 100644 --- a/output/kafka/output.go +++ b/output/kafka/output.go @@ -8,7 +8,6 @@ import ( "sync" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" "github.com/elastic/elastic-agent-shipper/queue" ) @@ -59,15 +58,14 @@ func (out *KafkaOutput) Start() error { client.log.Errorf("Shutting output down, queue closed: %v", err) break } - count := batch.Count() - events := make([]*messages.Event, count) - for i := 0; i < batch.Count(); i++ { - events[i], _ = batch.Entry(i).(*messages.Event) - } - + events := batch.Events() + remaining := uint64(len(events)) for len(events) > 0 { client.log.Debugf("Sending %d events to client to publish", len(events)) events, _ = client.publishEvents(events) + completed := remaining - uint64(len(events)) + remaining = uint64(len(events)) + batch.Done(completed) // TODO: error handling / retry backoff? client.log.Debugf("Finished sending batch with %v errors", len(events)) } @@ -78,7 +76,7 @@ func (out *KafkaOutput) Start() error { // shipper to track events by their queue IDs so outputs // can report status back to the server; see // https://github.com/elastic/elastic-agent-shipper/issues/27. - batch.Done() + batch.Done(remaining) } }() return nil diff --git a/queue/queue.go b/queue/queue.go index d02082c..54267e6 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -7,6 +7,7 @@ package queue import ( "context" "fmt" + "sync/atomic" beatsqueue "github.com/elastic/beats/v7/libbeat/publisher/queue" diskqueue "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" @@ -93,8 +94,12 @@ func (queue *Queue) Metrics() (Metrics, error) { return Metrics(metrics), err } -func (queue *Queue) Get(eventCount int) (beatsqueue.Batch, error) { - return queue.eventQueue.Get(eventCount) +func (queue *Queue) Get(eventCount int) (*WrappedBatch, error) { + batch, err := queue.eventQueue.Get(eventCount) + if err != nil { + return nil, err + } + return &WrappedBatch{batch: batch}, nil } func (queue *Queue) Close() error { @@ -117,3 +122,36 @@ func (queue *Queue) PersistedIndex() (EntryID, error) { return EntryID(metrics.OldestEntryID), nil } } + +// WrappedBatch is a bookkeeping wrapper around a libbeat queue batch, +// to work around the fact that shipper acknowledgements are per-event +// while the queue can only track an entire batch at a time. +// The plan is to eliminate WrappedBatch once batch assembly / acknowledgment +// is moved out of the libbeat queue. +type WrappedBatch struct { + batch beatsqueue.Batch + + // how many events from the batch have been acknowledged + doneCount uint64 + + // If CompletionCallback is non-nil, wrappedBatch will call it + // when all events have been consumed. + CompletionCallback func() +} + +func (w *WrappedBatch) Events() []*messages.Event { + events := make([]*messages.Event, w.batch.Count()) + for i := 0; i < w.batch.Count(); i++ { + events[i], _ = w.batch.Entry(i).(*messages.Event) + } + return events +} + +func (w *WrappedBatch) Done(count uint64) { + if atomic.AddUint64(&w.doneCount, count) >= uint64(w.batch.Count()) { + w.batch.Done() + if w.CompletionCallback != nil { + w.CompletionCallback() + } + } +} diff --git a/queue/queue_test.go b/queue/queue_test.go index d3e0ecd..2109873 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -38,13 +38,12 @@ func TestMemoryQueueSimpleBatch(t *testing.T) { batch, err := queue.Get(eventCount) assert.NoError(t, err, "couldn't get queue batch") - assert.Equal(t, batch.Count(), eventCount) + returned := batch.Events() + require.Equal(t, len(returned), eventCount) for i := 0; i < eventCount; i++ { - event, ok := batch.Entry(i).(*messages.Event) - assert.True(t, ok, "queue output should have the same concrete type as its input") // Need to use assert.True since assert.Equal* uses value comparison // for unequal pointers. - assert.True(t, event == &events[i], "memory queue should output the same pointer as its input") + assert.True(t, returned[i] == &events[i], "memory queue should output the same pointer as its input") } } @@ -121,17 +120,16 @@ func TestQueueTypes(t *testing.T) { for { batch, err := queue.Get(len(tracker)) assert.NoError(t, err, "couldn't get queue batch") - for i := 0; i < batch.Count(); i++ { + events := batch.Events() + for i := 0; i < len(events); i++ { // get each event and mark the index as received - event, ok := batch.Entry(i).(*messages.Event) - require.True(t, ok) - data := event.GetFields().GetData() + data := events[i].GetFields().GetData() testField, prs := data["message"] assert.True(t, prs) v := testField.GetNumberValue() tracker[int(v)] = true } - got = got + batch.Count() + got = got + len(events) if got == len(tracker) { break }