Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory queue: free event data sooner after acknowledgments #38042

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 40 additions & 29 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ func newACKLoop(broker *broker) *ackLoop {
func (l *ackLoop) run() {
b := l.broker
for {
nextBatchChan := l.pendingBatches.nextBatchChannel()

select {
case <-b.ctx.Done():
// The queue is shutting down.
Expand All @@ -48,19 +46,45 @@ func (l *ackLoop) run() {
// New batches have been generated, add them to the pending list
l.pendingBatches.concat(&chanList)

case <-nextBatchChan:
// The oldest outstanding batch has been acknowledged, advance our
// position as much as we can.
l.handleBatchSig()
// Subtlety: because runLoop delivers batches to consumedChan
// asynchronously, it's possible that they were already acknowledged
// before being added to pendingBatches, and in that case we should
// advance our position immediately.
l.maybeAdvanceBatchPosition()

case acked := <-b.ackedChan:
// A batch has been acknowledged. Mark it as done, remove its events
// from the queue buffer, and check if the queue position can be
// advanced.
l.handleBatchACK(acked)
l.maybeAdvanceBatchPosition()
}
}
}

// handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig
// is run by the ackLoop.
func (l *ackLoop) handleBatchSig() int {
ackedBatches := l.collectAcked()
// Collect all contiguous acknowledged batches from the start of the
// pending list.
func (l *ackLoop) collectACKed() batchList {
ackedBatches := batchList{}

for !l.pendingBatches.empty() {
batch := l.pendingBatches.front()
// This check is safe since only the ackLoop goroutine can modify
// "done" after the batch is created.
if !batch.done {
break
}
ackedBatches.append(l.pendingBatches.pop())
}

return ackedBatches
}

func (l *ackLoop) maybeAdvanceBatchPosition() {
ackedBatches := l.collectACKed()
if ackedBatches.empty() {
return
}
count := 0
for batch := ackedBatches.front(); batch != nil; batch = batch.next {
count += batch.count
Expand All @@ -84,28 +108,15 @@ func (l *ackLoop) handleBatchSig() int {
l.broker.logger.Debug("ackloop: return ack to broker loop:", count)

l.broker.logger.Debug("ackloop: done send ack")
return count
}

func (l *ackLoop) collectAcked() batchList {
ackedBatches := batchList{}

acks := l.pendingBatches.pop()
ackedBatches.append(acks)

done := false
for !l.pendingBatches.empty() && !done {
acks := l.pendingBatches.front()
select {
case <-acks.doneChan:
ackedBatches.append(l.pendingBatches.pop())

default:
done = true
}
func (l *ackLoop) handleBatchACK(b *batch) {
// Clear all event pointers so the memory can be reclaimed immediately.
for i := 0; i < b.count; i++ {
entry := b.rawEntry(i)
entry.event = nil
}

return ackedBatches
b.done = true
}

// Called by ackLoop. This function exists to decouple the work of collecting
Expand Down
34 changes: 18 additions & 16 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ type broker struct {
// through this channel so ackLoop can monitor them for acknowledgments.
consumedChan chan batchList

// When a batch is acknowledged by a consumer, it is sent to this channel,
// where it is read by ackLoop. When the oldest remaining batches are
// acknowledged, ackLoop calls any appropriate acknowledgment callbacks
// and notifies runLoop to advance the queue position.
ackedChan chan *batch

// ackCallback is a configurable callback to invoke when ACKs are processed.
// ackLoop calls this function when it advances the consumer ACK position.
// Right now this forwards the notification to queueACKed() in
Expand All @@ -86,8 +92,8 @@ type broker struct {

// When batches are acknowledged, ackLoop saves any metadata needed
// for producer callbacks and such, then notifies runLoop that it's
// safe to free these events and advance the queue by sending the
// acknowledged event count to this channel.
// safe to advance the queue position by sending the acknowledged
// event count to this channel.
deleteChan chan int

///////////////////////////////
Expand Down Expand Up @@ -129,9 +135,9 @@ type batch struct {
// Position and length of the events within the queue buffer
start, count int

// batch.Done() sends to doneChan, where ackLoop reads it and handles
// acknowledgment / cleanup.
doneChan chan batchDoneMsg
// Set to true when an acknowledgment for this batch is received.
// Only read/written by the ackLoop goroutine.
done bool
}

type batchList struct {
Expand Down Expand Up @@ -222,6 +228,10 @@ func newQueue(
// internal runLoop and ackLoop channels
consumedChan: make(chan batchList),
deleteChan: make(chan int),
// ackedChan is buffered so output workers don't block on acknowledgment
// if ackLoop is busy. (10 is probably more than we really need, but
// no harm in being safe.)
ackedChan: make(chan *batch, 10),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker.

If the buffering is for the workers could we perhaps make this dynamic for the number of workers? something like number of workers + 1? I'm just wondering if you are on a 64 core Gravitron server and the number of workers is 64, is 10 enough?


ackCallback: ackCallback,
}
Expand Down Expand Up @@ -287,9 +297,7 @@ func (b *broker) Metrics() (queue.Metrics, error) {

var batchPool = sync.Pool{
New: func() interface{} {
return &batch{
doneChan: make(chan batchDoneMsg, 1),
}
return &batch{}
},
}

Expand All @@ -299,6 +307,7 @@ func newBatch(queue *broker, start, count int) *batch {
batch.queue = queue
batch.start = start
batch.count = count
batch.done = false
return batch
}

Expand Down Expand Up @@ -346,13 +355,6 @@ func (l *batchList) front() *batch {
return l.head
}

func (l *batchList) nextBatchChannel() chan batchDoneMsg {
if l.head == nil {
return nil
}
return l.head.doneChan
}

func (l *batchList) pop() *batch {
ch := l.head
if ch != nil {
Expand Down Expand Up @@ -408,5 +410,5 @@ func (b *batch) FreeEntries() {
}

func (b *batch) Done() {
b.doneChan <- batchDoneMsg{}
b.queue.ackedChan <- b
}
9 changes: 2 additions & 7 deletions libbeat/publisher/queue/memqueue/runloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,8 @@ func (l *runLoop) handleGetReply(req *getRequest) {
}

func (l *runLoop) handleDelete(count int) {
// Clear the internal event pointers so they can be garbage collected
for i := 0; i < count; i++ {
index := (l.bufPos + i) % len(l.broker.buf)
l.broker.buf[index].event = nil
}

// Advance position and counters
// Advance position and counters. Event data has already been cleared
// by ackLoop.
l.bufPos = (l.bufPos + count) % len(l.broker.buf)
l.eventCount -= count
l.consumedCount -= count
Expand Down
Loading