Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read journal entries from all boots #41244

Merged
merged 8 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192]
- Log bad handshake details when websocket connection fails {pull}41300[41300]
- Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179]
- Journald input now can read events from all boots {issue}41083[41083] {pull}41244[41244]

*Heartbeat*

Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/journald/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -107,7 +107,7 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
e.t.Helper()
msg := strings.Builder{}
fmt.Fprintf(&msg, "did not find the expected %d events", count)
assert.Eventually(e.t, func() bool {
require.Eventually(e.t, func() bool {
sum := len(e.pipeline.GetAllEvents())
if sum == count {
return true
Expand Down
8 changes: 5 additions & 3 deletions filebeat/input/journald/input_filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,11 @@ func TestInputSeek(t *testing.T) {

env.waitUntilEventCount(len(testCase.expectedMessages))

for idx, event := range env.pipeline.GetAllEvents() {
if got, expected := event.Fields["message"], testCase.expectedMessages[idx]; got != expected {
t.Fatalf("expecting event message %q, got %q", expected, got)
if !t.Failed() {
for idx, event := range env.pipeline.GetAllEvents() {
if got, expected := event.Fields["message"], testCase.expectedMessages[idx]; got != expected {
t.Fatalf("expecting event message %q, got %q", expected, got)
}
}
}
})
Expand Down
36 changes: 23 additions & 13 deletions filebeat/input/journald/input_parsers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,41 @@ import (
// it only tests a single parser, but that is enough to ensure
// we're correctly using the parsers
func TestInputParsers(t *testing.T) {
inputParsersExpected := []string{"1st line\n2nd line\n3rd line", "4th line\n5th line\n6th line"}
env := newInputTestingEnvironment(t)

inp := env.mustCreateInput(mapstr.M{
"paths": []string{path.Join("testdata", "input-multiline-parser.journal")},
"include_matches.match": []string{"_SYSTEMD_USER_UNIT=log-service.service"},
"paths": []string{path.Join("testdata", "ndjson-parser.journal")},
"parsers": []mapstr.M{
{
"multiline": mapstr.M{
"type": "count",
"count_lines": 3,
"ndjson": mapstr.M{
"target": "",
},
},
},
})

ctx, cancelInput := context.WithCancel(context.Background())
t.Cleanup(cancelInput)
env.startInput(ctx, inp)
env.waitUntilEventCount(len(inputParsersExpected))
env.waitUntilEventCount(1)
event := env.pipeline.clients[0].GetEvents()[0]

foo, isString := event.Fields["foo"].(string)
if !isString {
t.Errorf("expecting field 'foo' to be string, got %T", event.Fields["foo"])
}

for idx, event := range env.pipeline.clients[0].GetEvents() {
if got, expected := event.Fields["message"], inputParsersExpected[idx]; got != expected {
t.Errorf("expecting event message %q, got %q", expected, got)
}
answer, isInt := event.Fields["answer"].(int64)
if !isInt {
t.Errorf("expecting field 'answer' to be int64, got %T", event.Fields["answer"])
}

cancelInput()
// The JSON in the test journal is: '{"foo": "bar", "answer":42}'
expectedFoo := "bar"
expectedAnswer := int64(42)
if foo != expectedFoo {
t.Errorf("expecting 'foo' from the Journal JSON to be '%s' got '%s' instead", expectedFoo, foo)
}
if answer != expectedAnswer {
t.Errorf("expecting 'answer' from the Journal JSON to be '%d' got '%d' instead", expectedAnswer, answer)
}
}
64 changes: 12 additions & 52 deletions filebeat/input/journald/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,59 +39,19 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

// How to write to journal from CLI:
// https://www.baeldung.com/linux/systemd-journal-message-terminal
func TestInputCanReadAllBoots(t *testing.T) {
env := newInputTestingEnvironment(t)
cfg := mapstr.M{
"paths": []string{path.Join("testdata", "multiple-boots.journal")},
}
inp := env.mustCreateInput(cfg)

// TestGenerateJournalEntries generates entries in the user's journal.
// It is kept commented out at the top of the file as reference and
// easy access.
//
// How to generate a journal file with only the entries you want:
// 1. Add the dependencies for this test
// go get github.com/ssgreg/journald
// 2. Uncomment and run the test:
// 3. Add the following import:
// journaldlogger "github.com/ssgreg/journald"
// 4. Get a VM, ssh into it, make sure you can access the test from it
// 5. Find the journal file, usually at /var/log/journal/<machine ID>/user-1000.journal
// 7. Clean and rotate the journal
// sudo journalctl --vacuum-time=1s
// sudo journalctl --rotate
// 8. Run this test: `go test -run=TestGenerateJournalEntries`
// 9. Copy the journal file somewhere else
// cp /var/log/journal/21282bcb80a74c08a0d14a047372256c/user-1000.journal /tmp/foo.journal
// 10. Read the journal file:
// journalctl --file=/tmp/foo.journal -n 10
// 11. Read the journal with all fields as JSON
// journalctl --file=/tmp/foo.journal -n 10 -o json
// func TestGenerateJournalEntries(t *testing.T) {
// fields := []map[string]any{
// {
// "BAR": "bar",
// },
// {
// "FOO": "foo",
// },
// {
// "BAR": "bar",
// "FOO": "foo",
// },
// {
// "FOO_BAR": "foo",
// },
// {
// "FOO_BAR": "bar",
// },
// {
// "FOO_BAR": "foo bar",
// },
// }
// for i, m := range fields {
// if err := journaldlogger.Send(fmt.Sprintf("message %d", i), journaldlogger.PriorityInfo, m); err != nil {
// t.Fatal(err)
// }
// }
// }
ctx, cancelInput := context.WithCancel(context.Background())
t.Cleanup(cancelInput)

env.startInput(ctx, inp)
env.waitUntilEventCount(6)
}

func TestInputFieldsTranslation(t *testing.T) {
// A few random keys to verify
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/journald/pkg/journalctl/jctlmock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 25 additions & 7 deletions filebeat/input/journald/pkg/journalctl/journalctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io"
"os/exec"
"strings"
"sync"

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/elastic-agent-libs/logp"
Expand All @@ -37,6 +38,7 @@ type journalctl struct {

logger *logp.Logger
canceler input.Canceler
waitDone sync.WaitGroup
}

// Factory returns an instance of journalctl ready to use.
Expand Down Expand Up @@ -95,7 +97,7 @@ func Factory(canceller input.Canceler, logger *logp.Logger, binary string, args
data, err := reader.ReadBytes('\n')
if err != nil {
if !errors.Is(err, io.EOF) {
logger.Errorf("cannot read from journalctl stdout: %s", err)
logger.Errorf("cannot read from journalctl stdout: '%s'", err)
}
return
}
Expand All @@ -118,10 +120,13 @@ func Factory(canceller input.Canceler, logger *logp.Logger, binary string, args

// Whenever the journalctl process exits, the `Wait` call returns,
// if there was an error it is logged and this goroutine exits.
jctl.waitDone.Add(1)
go func() {
defer jctl.waitDone.Done()
if err := cmd.Wait(); err != nil {
jctl.logger.Errorf("journalctl exited with an error, exit code %d ", cmd.ProcessState.ExitCode())
}
jctl.logger.Debugf("journalctl exit code: %d", cmd.ProcessState.ExitCode())
}()

return &jctl, nil
Expand All @@ -130,18 +135,31 @@ func Factory(canceller input.Canceler, logger *logp.Logger, binary string, args
// Kill Terminates the journalctl process using a SIGKILL.
func (j *journalctl) Kill() error {
j.logger.Debug("sending SIGKILL to journalctl")
err := j.cmd.Process.Kill()
return err
return j.cmd.Process.Kill()
}

func (j *journalctl) Next(cancel input.Canceler) ([]byte, error) {
// Next returns the next journal entry (as JSON). If `finished` is true, then
// journalctl finished returning all data and exited successfully, if journalctl
// exited unexpectedly, then `err` is non-nil, `finished` is false and an empty
// byte array is returned.
func (j *journalctl) Next(cancel input.Canceler) ([]byte, bool, error) {
select {
case <-cancel.Done():
return []byte{}, ErrCancelled
return []byte{}, false, ErrCancelled
case d, open := <-j.dataChan:
if !open {
return []byte{}, errors.New("no more data to read, journalctl might have exited unexpectedly")
// Wait for the process to exit, so we can read the exit code.
j.waitDone.Wait()
if j.cmd.ProcessState.ExitCode() == 0 {
return []byte{}, true, nil
}
return []byte{},
false,
fmt.Errorf(
"no more data to read, journalctl exited unexpectedly, exit code: %d",
j.cmd.ProcessState.ExitCode())
}
return d, nil

return d, false, nil
}
}
Loading
Loading