Skip to content

Commit

Permalink
remove scrape ancestors
Browse files Browse the repository at this point in the history
  • Loading branch information
mjwolf committed Apr 26, 2024
1 parent e01540a commit c57e009
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 62 deletions.
16 changes: 12 additions & 4 deletions x-pack/auditbeat/processors/sessionmd/add_session_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,20 @@ func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) {
// Do not enrich failed syscalls, as there was no actual process change related to it
v, err := ev.GetValue("auditd.result")
if err == nil && v == "fail" {
p.logger.Errorf("!!!! got failed syscall")
return ev, nil
}

pid, err := pidToUInt32(pi)
if err != nil {
return ev, nil //nolint:nilerr // Running on events with a different PID type is not a processor error
return ev, nil //nolint:nilerr // Running on events with a different PID type is not a processor error
}

err = p.provider.UpdateDB(ev, pid)
if err != nil {
return ev, err
}

result, err := p.enrich(ev, pid)
result, err := p.enrich(ev)
if err != nil {
return ev, fmt.Errorf("enriching event: %w", err)
}
Expand All @@ -136,7 +135,16 @@ func (p *addSessionMetadata) String() string {
processorName, p.config.Backend, p.config.PIDField)
}

func (p *addSessionMetadata) enrich(ev *beat.Event, pid uint32) (*beat.Event, error) {
func (p *addSessionMetadata) enrich(ev *beat.Event) (*beat.Event, error) {
pidIf, err := ev.GetValue(p.config.PIDField)
if err != nil {
return nil, err
}
pid, err := pidToUInt32(pidIf)
if err != nil {
return nil, fmt.Errorf("cannot parse pid field '%s': %w", p.config.PIDField, err)
}

fullProcess, err := p.db.GetProcess(pid)
if err != nil {
m := fmt.Errorf("pid %v not found in db: %w", pid, err)
Expand Down
31 changes: 0 additions & 31 deletions x-pack/auditbeat/processors/sessionmd/processdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ func (db *DB) InsertFork(fork types.ProcessForkEvent) {

pid := fork.ChildPIDs.Tgid
ppid := fork.ParentPIDs.Tgid
db.scrapeAncestors(db.processes[pid])

if entry, ok := db.processes[ppid]; ok {
entry.PIDs = pidInfoFromProto(fork.ChildPIDs)
Expand Down Expand Up @@ -282,7 +281,6 @@ func (db *DB) InsertExec(exec types.ProcessExecEvent) {
}

db.processes[exec.PIDs.Tgid] = proc
db.scrapeAncestors(proc)
entryLeaderPID := db.evaluateEntryLeader(proc)
if entryLeaderPID != nil {
db.entryLeaderRelationships[exec.PIDs.Tgid] = *entryLeaderPID
Expand Down Expand Up @@ -590,8 +588,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
fillParent(&ret, parent)
break
}
db.logger.Debugf("failed to find %d in DB (parent of %d), attempting to scrape", process.PIDs.Ppid, pid)
db.scrapeAncestors(process)
}
}

Expand All @@ -601,8 +597,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
fillGroupLeader(&ret, groupLeader)
break
}
db.logger.Debugf("failed to find %d in DB (group leader of %d), attempting to scrape", process.PIDs.Pgid, pid)
db.scrapeAncestors(process)
}
}

Expand All @@ -612,8 +606,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
fillSessionLeader(&ret, sessionLeader)
break
}
db.logger.Debugf("failed to find %d in DB (session leader of %d), attempting to scrape", process.PIDs.Sid, pid)
db.scrapeAncestors(process)
}
}

Expand Down Expand Up @@ -717,29 +709,6 @@ func getTTYType(major uint16, minor uint16) TTYType {
return TTYUnknown
}

func (db *DB) scrapeAncestors(proc Process) {
for _, pid := range []uint32{proc.PIDs.Pgid, proc.PIDs.Ppid, proc.PIDs.Sid} {
if _, exists := db.processes[pid]; pid == 0 || exists {
continue
}
procInfo, err := db.procfs.GetProcess(pid)
if err != nil {
db.logger.Debugf("couldn't get %v from procfs: %w", pid, err)
continue
}
p := Process{
PIDs: pidInfoFromProto(procInfo.PIDs),
Creds: credInfoFromProto(procInfo.Creds),
CTTY: ttyDevFromProto(procInfo.CTTY),
Argv: procInfo.Argv,
Cwd: procInfo.Cwd,
Env: procInfo.Env,
Filename: procInfo.Filename,
}
db.insertProcess(p)
}
}

func (db *DB) Close() {
close(db.stopChan)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
const (
name = "add_session_metadata"
eventMask = ebpf.EventMask(ebpfevents.EventTypeProcessFork | ebpfevents.EventTypeProcessExec | ebpfevents.EventTypeProcessExit)

)

type prvdr struct {
Expand Down Expand Up @@ -154,32 +153,34 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB) (pr
}

const (
maxWaitLimit = 500 * time.Millisecond
combinedWaitLimit = 5 * time.Second
backoffDuration = 2 * time.Second
resetDuration = 7 * time.Second
maxWaitLimit = 500 * time.Millisecond // Maximum time UpdateDB will wait for process
combinedWaitLimit = 5 * time.Second // Multiple UpdateDB calls will wait up to this amount within resetDuration
backoffDuration = 2 * time.Second // UpdateDB will stop waiting for processes for this time
resetDuration = 7 * time.Second // After this amount of times with no backoffs, the combinedWait will be reset
)

var (
combinedWait = 0 * time.Millisecond
inBackoff = false
backoffStart = time.Now()
since = time.Now()
combinedWait = 0 * time.Millisecond
inBackoff = false
backoffStart = time.Now()
since = time.Now()
backoffSkipped = 0
)


// With ebpf, process events are pushed to the DB by the above goroutine, so this doesn't actually update the DB.
// It does try sync the processor and ebpf events, so that the process is in the process db before continuing.
// It's possible that the event to enrich arrives before the process is inserted into the DB. In that case, this
// will block continuing the enrichment until the process is seen (or the timeout is reached).
//
// If for some reason a lot of time has been spent waiting for missing processes, this also has a backoff timer during
// which it will continue without waiting for missing events to arrive, so the processor doesn't become overly backed-up
// waiting for these processes.
func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error {
// With ebpf, process events are pushed to the DB by the above goroutine, but the event push could happen
// after the processor receives the event document to enrich (since auditd and ebpfevents are running separately
// and not synced).
// Instead of updating the DB, this will ensure that the DB was updated asynchronously before continuing.
if s.db.HasProcess(pid) {
return nil
}

now := time.Now()
// To avoid the processors getting backed up, there is a bockoff window with no waiting
if inBackoff {
if now.Sub(backoffStart) > backoffDuration {
s.logger.Warnf("ended backoff, skipped %d processes", backoffSkipped)
Expand All @@ -206,21 +207,21 @@ func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error {

start := now
nextWait := 5 * time.Millisecond
for {
waited := time.Now().Sub(start)
for {
waited := time.Since(start)
if s.db.HasProcess(pid) {
s.logger.Warnf("***** Got process that was missing after %v", waited)
s.logger.Debugf("got process that was missing after %v", waited)
combinedWait = combinedWait + waited
return nil
}
if waited >= maxWaitLimit {
m := fmt.Errorf("!!!!did not get process %v after %vs", pid, waited.Seconds())
s.logger.Warnf("%s", m)
e := fmt.Errorf("process %v was not seen after %v", pid, waited)
s.logger.Warnf("%w", e)
combinedWait = combinedWait + waited
return m
return e
}
time.Sleep(nextWait)
if nextWait * 2 + waited > maxWaitLimit {
if nextWait*2+waited > maxWaitLimit {
nextWait = maxWaitLimit - waited
} else {
nextWait = nextWait * 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error {
switch syscall {
case "execveat", "execve":
pe := types.ProcessExecEvent{}
proc_info, err := s.reader.GetProcess(uint32(pid))
proc_info, err := s.reader.GetProcess(pid)
if err == nil {
pe.PIDs = proc_info.PIDs
pe.Creds = proc_info.Creds
Expand All @@ -63,7 +63,7 @@ func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error {
s.logger.Warnf("couldn't get process info from proc for pid %v: %w", pid, err)
// If process info couldn't be taken from procfs, populate with as much info as
// possible from the event
pe.PIDs.Tgid = uint32(pid)
pe.PIDs.Tgid = pid
var intr interface{}
var i int
var ok bool
Expand Down Expand Up @@ -97,7 +97,7 @@ func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error {
case "exit_group":
pe := types.ProcessExitEvent{
PIDs: types.PIDInfo{
Tgid: uint32(pid),
Tgid: pid,
},
}
s.db.InsertExit(pe)
Expand All @@ -113,8 +113,8 @@ func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error {
if result == "success" {
setsid_ev := types.ProcessSetsidEvent{
PIDs: types.PIDInfo{
Tgid: uint32(pid),
Sid: uint32(pid),
Tgid: pid,
Sid: pid,
},
}
s.db.InsertSetsid(setsid_ev)
Expand Down

0 comments on commit c57e009

Please sign in to comment.