Skip to content

Commit

Permalink
Add registry to Filebeat's diagnostic (#41795)
Browse files Browse the repository at this point in the history
This commit adds Filebeat's registry folder to the Elastic-Agent
diagnostics. It's called `registry.tar.gz` and includes all registry files
on `${path.home}/registry`.

The registry is first archived into a temporary tar file. The
temporary file is created by calling `os.CreateTemp` and will use the
OS's temporary folder. Then it's gziped in memory and returned to
Elastic-Agent, finally the temporary file is removed from the disk.

If the final gziped file is more than 20mb, it is skipped due to its large
size.
  • Loading branch information
belimawr authored Jan 23, 2025
1 parent 5720300 commit 572cb0e
Show file tree
Hide file tree
Showing 6 changed files with 653 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
- Update CEL mito extensions to v1.16.0. {pull}41727[41727]
- Filebeat's registry is now added to the Elastic-Agent diagnostics bundle {issue}33238[33238] {pull}41795[41795]
- Add `unifiedlogs` input for MacOS. {pull}41791[41791]
- Add evaluation state dump debugging option to CEL input. {pull}41335[41335]
- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862]
Expand Down
217 changes: 217 additions & 0 deletions filebeat/beater/diagnostics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
"archive/tar"
"bytes"
"compress/gzip"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"regexp"
"strings"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

func getRegexpsForRegistryFiles() ([]*regexp.Regexp, error) {
// We use regexps here because globs do not support specifying a character
// range like we do in the checkpoint file.

registryFileRegExps := []*regexp.Regexp{}
preFilesList := [][]string{
[]string{"^registry$"},
[]string{"^registry", "filebeat$"},
[]string{"^registry", "filebeat", "meta\\.json$"},
[]string{"^registry", "filebeat", "log\\.json$"},
[]string{"^registry", "filebeat", "active\\.dat$"},
[]string{"^registry", "filebeat", "[[:digit:]]*\\.json$"},
}

for _, lst := range preFilesList {
var path string
if filepath.Separator == '\\' {
path = strings.Join(lst, `\\`)
} else {
path = filepath.Join(lst...)
}

// Compile the reg exp, if there is an error, stop and return.
// There should be no error here as this code is tested in all
// supported OSes, however to avoid a code path that leads to a
// panic, we cannot use `regexp.MustCompile` and handle the error
re, err := regexp.Compile(path)
if err != nil {
return nil, fmt.Errorf("cannot compile reg exp: %w", err)
}

registryFileRegExps = append(registryFileRegExps, re)
}

return registryFileRegExps, nil
}

func gzipRegistry() []byte {
logger := logp.L().Named("diagnostics")
buf := bytes.Buffer{}
dataPath := paths.Resolve(paths.Data, "")
registryPath := filepath.Join(dataPath, "registry")
f, err := os.CreateTemp("", "filebeat-registry-*.tar")
if err != nil {
logger.Errorw("cannot create temporary registry archive", "error.message", err)
}
// Close the file, we just need the empty file created to use it later
f.Close()
defer logger.Debug("finished gziping Filebeat's registry")

defer func() {
if err := os.Remove(f.Name()); err != nil {
logger.Warnf("cannot remove temporary registry archive '%s': '%s'", f.Name(), err)
}
}()

logger.Debugf("temporary file '%s' created", f.Name())
if err := tarFolder(logger, registryPath, f.Name()); err != nil {
logger.Errorw(fmt.Sprintf("cannot archive Filebeat's registry at '%s'", f.Name()), "error.message", err)
}

if err := gzipFile(logger, f.Name(), &buf); err != nil {
logger.Errorw("cannot gzip Filebeat's registry", "error.message", err)
}

// if the final file is too large, skip it
if buf.Len() >= 20_000_000 { // 20 Mb
logger.Warnf("registry is too large for diagnostics, %dmb bytes > 20mb", buf.Len()/1_000_000)
return nil
}

return buf.Bytes()
}

// gzipFile gzips src writing the compressed data to dst
func gzipFile(logger *logp.Logger, src string, dst io.Writer) error {
reader, err := os.Open(src)
if err != nil {
return fmt.Errorf("cannot open '%s': '%w'", src, err)
}
defer reader.Close()

writer := gzip.NewWriter(dst)
defer writer.Close()
writer.Name = filepath.Base(src)

if _, err := io.Copy(writer, reader); err != nil {
if err != nil {
return fmt.Errorf("cannot gzip file '%s': '%w'", src, err)
}
}

return nil
}

// tarFolder creates a tar archive from the folder src and stores it at dst.
//
// dst must be the full path with extension, e.g: /tmp/foo.tar
// If src is not a folder an error is retruned
func tarFolder(logger *logp.Logger, src, dst string) error {
fullPath, err := filepath.Abs(src)
if err != nil {
return fmt.Errorf("cannot get full path from '%s': '%w'", src, err)
}

tarFile, err := os.Create(dst)
if err != nil {
return fmt.Errorf("cannot create tar file '%s': '%w'", dst, err)
}
defer tarFile.Close()

tarWriter := tar.NewWriter(tarFile)
defer tarWriter.Close()

info, err := os.Stat(fullPath)
if err != nil {
return fmt.Errorf("cannot stat '%s': '%w'", fullPath, err)
}

if !info.IsDir() {
return fmt.Errorf("'%s' is not a directory", fullPath)
}
baseDir := filepath.Base(src)

logger.Debugf("starting to walk '%s'", fullPath)

// This error should never happen at runtime, if something
// breaks it should break the tests and be fixed before a
// release. We handle the error here to avoid a code path
// that can end into a panic.
registryFileRegExps, err := getRegexpsForRegistryFiles()
if err != nil {
return err
}

return filepath.Walk(fullPath, func(path string, info fs.FileInfo, prevErr error) error {
// Stop if there is any errors
if prevErr != nil {
return prevErr
}

pathInTar := filepath.Join(baseDir, strings.TrimPrefix(path, src))
if !matchRegistyFiles(registryFileRegExps, pathInTar) {
return nil
}
header, err := tar.FileInfoHeader(info, info.Name())
if err != nil {
return fmt.Errorf("cannot create tar info header: '%w'", err)
}
header.Name = pathInTar

if err := tarWriter.WriteHeader(header); err != nil {
return fmt.Errorf("cannot write tar header for '%s': '%w'", path, err)
}

if info.IsDir() {
return nil
}

file, err := os.Open(path)
if err != nil {
return fmt.Errorf("cannot open '%s' for reading: '%w", path, err)
}
defer file.Close()

logger.Debugf("adding '%s' to the tar archive", file.Name())
if _, err := io.Copy(tarWriter, file); err != nil {
return fmt.Errorf("cannot read '%s': '%w'", path, err)
}

return nil
})
}

func matchRegistyFiles(registryFileRegExps []*regexp.Regexp, path string) bool {
for _, regExp := range registryFileRegExps {
if regExp.MatchString(path) {
return true
}
}
return false
}
66 changes: 66 additions & 0 deletions filebeat/beater/diagnostics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
"fmt"
"path/filepath"
"testing"
)

func TestMatchRegistryFiles(t *testing.T) {
positiveMatches := []string{
filepath.Join("registry", "filebeat", "49855.json"),
filepath.Join("registry", "filebeat", "active.dat"),
filepath.Join("registry", "filebeat", "meta.json"),
filepath.Join("registry", "filebeat", "log.json"),
}
negativeMatches := []string{
filepath.Join("registry", "filebeat", "bar.dat"),
filepath.Join("registry", "filebeat", "log.txt"),
filepath.Join("registry", "42.json"),
filepath.Join("nop", "active.dat"),
}
registryFileRegExps, err := getRegexpsForRegistryFiles()
if err != nil {
t.Fatalf("cannot compile regexps for registry paths: %s", err)
}

testFn := func(t *testing.T, path string, match bool) {
result := matchRegistyFiles(registryFileRegExps, path)
if result != match {
t.Errorf(
"mathRegisryFiles('%s') should return %t, got %t instead",
path,
match,
result)
}
}

for _, path := range positiveMatches {
t.Run(fmt.Sprintf("%s returns true", path), func(t *testing.T) {
testFn(t, path, true)
})
}

for _, path := range negativeMatches {
t.Run(fmt.Sprintf("%s returns false", path), func(t *testing.T) {
testFn(t, path, false)
})
}
}
7 changes: 7 additions & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
}
return data
})

b.Manager.RegisterDiagnosticHook(
"registry",
"Filebeat's registry",
"registry.tar.gz",
"application/octet-stream",
gzipRegistry)
}

// Add inputs created by the modules
Expand Down
8 changes: 7 additions & 1 deletion x-pack/filebeat/input/benchmark/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,13 @@ type inputMetrics struct {

// newInputMetrics returns an input metric for the benchmark processor.
func newInputMetrics(ctx v2.Context) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, ctx.ID, ctx.Agent.Monitoring.Namespace.GetRegistry())
var globalRegistry *monitoring.Registry
// When running under Elastic-Agent Namespace can be nil.
// Passing a nil registry to inputmon.NewInputRegistry is not a problem.
if ctx.Agent.Monitoring.Namespace != nil {
globalRegistry = ctx.Agent.Monitoring.Namespace.GetRegistry()
}
reg, unreg := inputmon.NewInputRegistry(inputName, ctx.ID, globalRegistry)
out := &inputMetrics{
unregister: unreg,
eventsPublished: monitoring.NewUint(reg, "events_published_total"),
Expand Down
Loading

0 comments on commit 572cb0e

Please sign in to comment.