Skip to content

Commit

Permalink
Add registry to Filebeat's diagnostic (#41795) (#42413)
Browse files Browse the repository at this point in the history
This commit adds Filebeat's registry folder to the Elastic-Agent
diagnostics. It's called `registry.tar.gz` and includes all registry files
on `${path.home}/registry`.

The registry is first archived into a temporary tar file. The
temporary file is created by calling `os.CreateTemp` and will use the
OS's temporary folder. Then it's gziped in memory and returned to
Elastic-Agent, finally the temporary file is removed from the disk.

If the final gziped file is more than 20mb, it is skipped due to its large
size.

(cherry picked from commit 572cb0e)

Co-authored-by: Tiago Queiroz <[email protected]>
  • Loading branch information
mergify[bot] and belimawr authored Jan 27, 2025
1 parent 665cbd4 commit a633515
Show file tree
Hide file tree
Showing 6 changed files with 653 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
- Update CEL mito extensions to v1.16.0. {pull}41727[41727]
- Filebeat's registry is now added to the Elastic-Agent diagnostics bundle {issue}33238[33238] {pull}41795[41795]
- Add `unifiedlogs` input for MacOS. {pull}41791[41791]
- Add evaluation state dump debugging option to CEL input. {pull}41335[41335]
- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862]
Expand Down
217 changes: 217 additions & 0 deletions filebeat/beater/diagnostics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
"archive/tar"
"bytes"
"compress/gzip"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"regexp"
"strings"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

func getRegexpsForRegistryFiles() ([]*regexp.Regexp, error) {
// We use regexps here because globs do not support specifying a character
// range like we do in the checkpoint file.

registryFileRegExps := []*regexp.Regexp{}
preFilesList := [][]string{
[]string{"^registry$"},
[]string{"^registry", "filebeat$"},
[]string{"^registry", "filebeat", "meta\\.json$"},
[]string{"^registry", "filebeat", "log\\.json$"},
[]string{"^registry", "filebeat", "active\\.dat$"},
[]string{"^registry", "filebeat", "[[:digit:]]*\\.json$"},
}

for _, lst := range preFilesList {
var path string
if filepath.Separator == '\\' {
path = strings.Join(lst, `\\`)
} else {
path = filepath.Join(lst...)
}

// Compile the reg exp, if there is an error, stop and return.
// There should be no error here as this code is tested in all
// supported OSes, however to avoid a code path that leads to a
// panic, we cannot use `regexp.MustCompile` and handle the error
re, err := regexp.Compile(path)
if err != nil {
return nil, fmt.Errorf("cannot compile reg exp: %w", err)
}

registryFileRegExps = append(registryFileRegExps, re)
}

return registryFileRegExps, nil
}

func gzipRegistry() []byte {
logger := logp.L().Named("diagnostics")
buf := bytes.Buffer{}
dataPath := paths.Resolve(paths.Data, "")
registryPath := filepath.Join(dataPath, "registry")
f, err := os.CreateTemp("", "filebeat-registry-*.tar")
if err != nil {
logger.Errorw("cannot create temporary registry archive", "error.message", err)
}
// Close the file, we just need the empty file created to use it later
f.Close()
defer logger.Debug("finished gziping Filebeat's registry")

defer func() {
if err := os.Remove(f.Name()); err != nil {
logger.Warnf("cannot remove temporary registry archive '%s': '%s'", f.Name(), err)
}
}()

logger.Debugf("temporary file '%s' created", f.Name())
if err := tarFolder(logger, registryPath, f.Name()); err != nil {
logger.Errorw(fmt.Sprintf("cannot archive Filebeat's registry at '%s'", f.Name()), "error.message", err)
}

if err := gzipFile(logger, f.Name(), &buf); err != nil {
logger.Errorw("cannot gzip Filebeat's registry", "error.message", err)
}

// if the final file is too large, skip it
if buf.Len() >= 20_000_000 { // 20 Mb
logger.Warnf("registry is too large for diagnostics, %dmb bytes > 20mb", buf.Len()/1_000_000)
return nil
}

return buf.Bytes()
}

// gzipFile gzips src writing the compressed data to dst
func gzipFile(logger *logp.Logger, src string, dst io.Writer) error {
reader, err := os.Open(src)
if err != nil {
return fmt.Errorf("cannot open '%s': '%w'", src, err)
}
defer reader.Close()

writer := gzip.NewWriter(dst)
defer writer.Close()
writer.Name = filepath.Base(src)

if _, err := io.Copy(writer, reader); err != nil {
if err != nil {
return fmt.Errorf("cannot gzip file '%s': '%w'", src, err)
}
}

return nil
}

// tarFolder creates a tar archive from the folder src and stores it at dst.
//
// dst must be the full path with extension, e.g: /tmp/foo.tar
// If src is not a folder an error is retruned
func tarFolder(logger *logp.Logger, src, dst string) error {
fullPath, err := filepath.Abs(src)
if err != nil {
return fmt.Errorf("cannot get full path from '%s': '%w'", src, err)
}

tarFile, err := os.Create(dst)
if err != nil {
return fmt.Errorf("cannot create tar file '%s': '%w'", dst, err)
}
defer tarFile.Close()

tarWriter := tar.NewWriter(tarFile)
defer tarWriter.Close()

info, err := os.Stat(fullPath)
if err != nil {
return fmt.Errorf("cannot stat '%s': '%w'", fullPath, err)
}

if !info.IsDir() {
return fmt.Errorf("'%s' is not a directory", fullPath)
}
baseDir := filepath.Base(src)

logger.Debugf("starting to walk '%s'", fullPath)

// This error should never happen at runtime, if something
// breaks it should break the tests and be fixed before a
// release. We handle the error here to avoid a code path
// that can end into a panic.
registryFileRegExps, err := getRegexpsForRegistryFiles()
if err != nil {
return err
}

return filepath.Walk(fullPath, func(path string, info fs.FileInfo, prevErr error) error {
// Stop if there is any errors
if prevErr != nil {
return prevErr
}

pathInTar := filepath.Join(baseDir, strings.TrimPrefix(path, src))
if !matchRegistyFiles(registryFileRegExps, pathInTar) {
return nil
}
header, err := tar.FileInfoHeader(info, info.Name())
if err != nil {
return fmt.Errorf("cannot create tar info header: '%w'", err)
}
header.Name = pathInTar

if err := tarWriter.WriteHeader(header); err != nil {
return fmt.Errorf("cannot write tar header for '%s': '%w'", path, err)
}

if info.IsDir() {
return nil
}

file, err := os.Open(path)
if err != nil {
return fmt.Errorf("cannot open '%s' for reading: '%w", path, err)
}
defer file.Close()

logger.Debugf("adding '%s' to the tar archive", file.Name())
if _, err := io.Copy(tarWriter, file); err != nil {
return fmt.Errorf("cannot read '%s': '%w'", path, err)
}

return nil
})
}

func matchRegistyFiles(registryFileRegExps []*regexp.Regexp, path string) bool {
for _, regExp := range registryFileRegExps {
if regExp.MatchString(path) {
return true
}
}
return false
}
66 changes: 66 additions & 0 deletions filebeat/beater/diagnostics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
"fmt"
"path/filepath"
"testing"
)

func TestMatchRegistryFiles(t *testing.T) {
positiveMatches := []string{
filepath.Join("registry", "filebeat", "49855.json"),
filepath.Join("registry", "filebeat", "active.dat"),
filepath.Join("registry", "filebeat", "meta.json"),
filepath.Join("registry", "filebeat", "log.json"),
}
negativeMatches := []string{
filepath.Join("registry", "filebeat", "bar.dat"),
filepath.Join("registry", "filebeat", "log.txt"),
filepath.Join("registry", "42.json"),
filepath.Join("nop", "active.dat"),
}
registryFileRegExps, err := getRegexpsForRegistryFiles()
if err != nil {
t.Fatalf("cannot compile regexps for registry paths: %s", err)
}

testFn := func(t *testing.T, path string, match bool) {
result := matchRegistyFiles(registryFileRegExps, path)
if result != match {
t.Errorf(
"mathRegisryFiles('%s') should return %t, got %t instead",
path,
match,
result)
}
}

for _, path := range positiveMatches {
t.Run(fmt.Sprintf("%s returns true", path), func(t *testing.T) {
testFn(t, path, true)
})
}

for _, path := range negativeMatches {
t.Run(fmt.Sprintf("%s returns false", path), func(t *testing.T) {
testFn(t, path, false)
})
}
}
7 changes: 7 additions & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
}
return data
})

b.Manager.RegisterDiagnosticHook(
"registry",
"Filebeat's registry",
"registry.tar.gz",
"application/octet-stream",
gzipRegistry)
}

// Add inputs created by the modules
Expand Down
8 changes: 7 additions & 1 deletion x-pack/filebeat/input/benchmark/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,13 @@ type inputMetrics struct {

// newInputMetrics returns an input metric for the benchmark processor.
func newInputMetrics(ctx v2.Context) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, ctx.ID, ctx.Agent.Monitoring.Namespace.GetRegistry())
var globalRegistry *monitoring.Registry
// When running under Elastic-Agent Namespace can be nil.
// Passing a nil registry to inputmon.NewInputRegistry is not a problem.
if ctx.Agent.Monitoring.Namespace != nil {
globalRegistry = ctx.Agent.Monitoring.Namespace.GetRegistry()
}
reg, unreg := inputmon.NewInputRegistry(inputName, ctx.ID, globalRegistry)
out := &inputMetrics{
unregister: unreg,
eventsPublished: monitoring.NewUint(reg, "events_published_total"),
Expand Down
Loading

0 comments on commit a633515

Please sign in to comment.