From c57e0099c7cd4a3db37a0bfb7e0a4c33467a6d43 Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Fri, 26 Apr 2024 13:43:05 -0700 Subject: [PATCH] remove scrape ancestors --- .../sessionmd/add_session_metadata.go | 16 +++++-- .../processors/sessionmd/processdb/db.go | 31 ------------- .../provider/ebpf_provider/ebpf_provider.go | 45 ++++++++++--------- .../procfs_provider/procfs_provider.go | 10 ++--- 4 files changed, 40 insertions(+), 62 deletions(-) diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go index 025f2eecd9d..02c389353a2 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go @@ -105,13 +105,12 @@ func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) { // Do not enrich failed syscalls, as there was no actual process change related to it v, err := ev.GetValue("auditd.result") if err == nil && v == "fail" { - p.logger.Errorf("!!!! got failed syscall") return ev, nil } pid, err := pidToUInt32(pi) if err != nil { - return ev, nil //nolint:nilerr // Running on events with a different PID type is not a processor error + return ev, nil //nolint:nilerr // Running on events with a different PID type is not a processor error } err = p.provider.UpdateDB(ev, pid) @@ -119,7 +118,7 @@ func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) { return ev, err } - result, err := p.enrich(ev, pid) + result, err := p.enrich(ev) if err != nil { return ev, fmt.Errorf("enriching event: %w", err) } @@ -136,7 +135,16 @@ func (p *addSessionMetadata) String() string { processorName, p.config.Backend, p.config.PIDField) } -func (p *addSessionMetadata) enrich(ev *beat.Event, pid uint32) (*beat.Event, error) { +func (p *addSessionMetadata) enrich(ev *beat.Event) (*beat.Event, error) { + pidIf, err := ev.GetValue(p.config.PIDField) + if err != nil { + return nil, err + } + pid, err := pidToUInt32(pidIf) + if err != nil { + return nil, fmt.Errorf("cannot parse pid field '%s': %w", p.config.PIDField, err) + } + fullProcess, err := p.db.GetProcess(pid) if err != nil { m := fmt.Errorf("pid %v not found in db: %w", pid, err) diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/db.go b/x-pack/auditbeat/processors/sessionmd/processdb/db.go index 462e1e85baa..55826391243 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/db.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/db.go @@ -238,7 +238,6 @@ func (db *DB) InsertFork(fork types.ProcessForkEvent) { pid := fork.ChildPIDs.Tgid ppid := fork.ParentPIDs.Tgid - db.scrapeAncestors(db.processes[pid]) if entry, ok := db.processes[ppid]; ok { entry.PIDs = pidInfoFromProto(fork.ChildPIDs) @@ -282,7 +281,6 @@ func (db *DB) InsertExec(exec types.ProcessExecEvent) { } db.processes[exec.PIDs.Tgid] = proc - db.scrapeAncestors(proc) entryLeaderPID := db.evaluateEntryLeader(proc) if entryLeaderPID != nil { db.entryLeaderRelationships[exec.PIDs.Tgid] = *entryLeaderPID @@ -590,8 +588,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) { fillParent(&ret, parent) break } - db.logger.Debugf("failed to find %d in DB (parent of %d), attempting to scrape", process.PIDs.Ppid, pid) - db.scrapeAncestors(process) } } @@ -601,8 +597,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) { fillGroupLeader(&ret, groupLeader) break } - db.logger.Debugf("failed to find %d in DB (group leader of %d), attempting to scrape", process.PIDs.Pgid, pid) - db.scrapeAncestors(process) } } @@ -612,8 +606,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) { fillSessionLeader(&ret, sessionLeader) break } - db.logger.Debugf("failed to find %d in DB (session leader of %d), attempting to scrape", process.PIDs.Sid, pid) - db.scrapeAncestors(process) } } @@ -717,29 +709,6 @@ func getTTYType(major uint16, minor uint16) TTYType { return TTYUnknown } -func (db *DB) scrapeAncestors(proc Process) { - for _, pid := range []uint32{proc.PIDs.Pgid, proc.PIDs.Ppid, proc.PIDs.Sid} { - if _, exists := db.processes[pid]; pid == 0 || exists { - continue - } - procInfo, err := db.procfs.GetProcess(pid) - if err != nil { - db.logger.Debugf("couldn't get %v from procfs: %w", pid, err) - continue - } - p := Process{ - PIDs: pidInfoFromProto(procInfo.PIDs), - Creds: credInfoFromProto(procInfo.Creds), - CTTY: ttyDevFromProto(procInfo.CTTY), - Argv: procInfo.Argv, - Cwd: procInfo.Cwd, - Env: procInfo.Env, - Filename: procInfo.Filename, - } - db.insertProcess(p) - } -} - func (db *DB) Close() { close(db.stopChan) } diff --git a/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go b/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go index 60494e907ac..33f5246af46 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go @@ -23,7 +23,6 @@ import ( const ( name = "add_session_metadata" eventMask = ebpf.EventMask(ebpfevents.EventTypeProcessFork | ebpfevents.EventTypeProcessExec | ebpfevents.EventTypeProcessExit) - ) type prvdr struct { @@ -154,32 +153,34 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB) (pr } const ( - maxWaitLimit = 500 * time.Millisecond - combinedWaitLimit = 5 * time.Second - backoffDuration = 2 * time.Second - resetDuration = 7 * time.Second + maxWaitLimit = 500 * time.Millisecond // Maximum time UpdateDB will wait for process + combinedWaitLimit = 5 * time.Second // Multiple UpdateDB calls will wait up to this amount within resetDuration + backoffDuration = 2 * time.Second // UpdateDB will stop waiting for processes for this time + resetDuration = 7 * time.Second // After this amount of times with no backoffs, the combinedWait will be reset ) var ( - combinedWait = 0 * time.Millisecond - inBackoff = false - backoffStart = time.Now() - since = time.Now() + combinedWait = 0 * time.Millisecond + inBackoff = false + backoffStart = time.Now() + since = time.Now() backoffSkipped = 0 ) - +// With ebpf, process events are pushed to the DB by the above goroutine, so this doesn't actually update the DB. +// It does try sync the processor and ebpf events, so that the process is in the process db before continuing. +// It's possible that the event to enrich arrives before the process is inserted into the DB. In that case, this +// will block continuing the enrichment until the process is seen (or the timeout is reached). +// +// If for some reason a lot of time has been spent waiting for missing processes, this also has a backoff timer during +// which it will continue without waiting for missing events to arrive, so the processor doesn't become overly backed-up +// waiting for these processes. func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error { - // With ebpf, process events are pushed to the DB by the above goroutine, but the event push could happen - // after the processor receives the event document to enrich (since auditd and ebpfevents are running separately - // and not synced). - // Instead of updating the DB, this will ensure that the DB was updated asynchronously before continuing. if s.db.HasProcess(pid) { return nil } now := time.Now() - // To avoid the processors getting backed up, there is a bockoff window with no waiting if inBackoff { if now.Sub(backoffStart) > backoffDuration { s.logger.Warnf("ended backoff, skipped %d processes", backoffSkipped) @@ -206,21 +207,21 @@ func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error { start := now nextWait := 5 * time.Millisecond - for { - waited := time.Now().Sub(start) + for { + waited := time.Since(start) if s.db.HasProcess(pid) { - s.logger.Warnf("***** Got process that was missing after %v", waited) + s.logger.Debugf("got process that was missing after %v", waited) combinedWait = combinedWait + waited return nil } if waited >= maxWaitLimit { - m := fmt.Errorf("!!!!did not get process %v after %vs", pid, waited.Seconds()) - s.logger.Warnf("%s", m) + e := fmt.Errorf("process %v was not seen after %v", pid, waited) + s.logger.Warnf("%w", e) combinedWait = combinedWait + waited - return m + return e } time.Sleep(nextWait) - if nextWait * 2 + waited > maxWaitLimit { + if nextWait*2+waited > maxWaitLimit { nextWait = maxWaitLimit - waited } else { nextWait = nextWait * 2 diff --git a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go index 58f8512a215..6525b860b6d 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go @@ -50,7 +50,7 @@ func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error { switch syscall { case "execveat", "execve": pe := types.ProcessExecEvent{} - proc_info, err := s.reader.GetProcess(uint32(pid)) + proc_info, err := s.reader.GetProcess(pid) if err == nil { pe.PIDs = proc_info.PIDs pe.Creds = proc_info.Creds @@ -63,7 +63,7 @@ func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error { s.logger.Warnf("couldn't get process info from proc for pid %v: %w", pid, err) // If process info couldn't be taken from procfs, populate with as much info as // possible from the event - pe.PIDs.Tgid = uint32(pid) + pe.PIDs.Tgid = pid var intr interface{} var i int var ok bool @@ -97,7 +97,7 @@ func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error { case "exit_group": pe := types.ProcessExitEvent{ PIDs: types.PIDInfo{ - Tgid: uint32(pid), + Tgid: pid, }, } s.db.InsertExit(pe) @@ -113,8 +113,8 @@ func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error { if result == "success" { setsid_ev := types.ProcessSetsidEvent{ PIDs: types.PIDInfo{ - Tgid: uint32(pid), - Sid: uint32(pid), + Tgid: pid, + Sid: pid, }, } s.db.InsertSetsid(setsid_ev)