Skip to content

Commit

Permalink
Fix entry deletion in the memqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm committed Oct 21, 2024
1 parent 5de2287 commit c40fe34
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
10 changes: 6 additions & 4 deletions libbeat/publisher/queue/memqueue/runloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,14 @@ func (l *runLoop) handleGetReply(req *getRequest) {

func (l *runLoop) handleDelete(count int) {
byteCount := 0
// remove entries from the queue
for i := 0; i < count; i++ {
entry := l.broker.buf[(l.bufPos+i)%len(l.broker.buf)]
byteCount += entry.eventSize
index := (l.bufPos + i) % len(l.broker.buf)
byteCount += l.broker.buf[index].eventSize
l.broker.buf[index].event = nil
}
// Advance position and counters. Event data was already cleared in
// batch.FreeEntries when the events were vended.

// advance the buffer position and update tracking fields
l.bufPos = (l.bufPos + count) % len(l.broker.buf)
l.eventCount -= count
l.consumedCount -= count
Expand Down
10 changes: 10 additions & 0 deletions libbeat/publisher/queue/memqueue/runloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package memqueue

import (
"context"
"slices"
"testing"
"time"

Expand Down Expand Up @@ -176,11 +177,20 @@ func TestObserverRemoveEvents(t *testing.T) {
// Initialize the queue entries to a test byte size
for i := range rl.broker.buf {
rl.broker.buf[i].eventSize = 123
rl.broker.buf[i].event = publisher.Event{}
}
const deleteCount = 25
rl.broker.deleteChan <- deleteCount
// Run one iteration of the run loop, so it can handle the delete request
rl.runIteration()

// The entries should actually be deleted
expectedRemaining := len(rl.broker.buf) - deleteCount
remainingEntries := slices.DeleteFunc(slices.Clone(rl.broker.buf), func(entry queueEntry) bool {
return entry.event == nil
})
assert.Len(t, remainingEntries, expectedRemaining)

// It should have deleted 25 events, so we expect the size to be 25 * 123.
assertRegistryUint(t, reg, "queue.removed.events", deleteCount, "Deleting from the queue should report the removed events")
assertRegistryUint(t, reg, "queue.removed.bytes", deleteCount*123, "Deleting from the queue should report the removed bytes")
Expand Down

0 comments on commit c40fe34

Please sign in to comment.