diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index 1a964d8bb45f..4a512730f2f8 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -37,8 +37,6 @@ func newACKLoop(broker *broker) *ackLoop { func (l *ackLoop) run() { b := l.broker for { - nextBatchChan := l.pendingBatches.nextBatchChannel() - select { case <-b.ctx.Done(): // The queue is shutting down. @@ -48,19 +46,45 @@ func (l *ackLoop) run() { // New batches have been generated, add them to the pending list l.pendingBatches.concat(&chanList) - case <-nextBatchChan: - // The oldest outstanding batch has been acknowledged, advance our - // position as much as we can. - l.handleBatchSig() + // Subtlety: because runLoop delivers batches to consumedChan + // asynchronously, it's possible that they were already acknowledged + // before being added to pendingBatches, and in that case we should + // advance our position immediately. + l.maybeAdvanceBatchPosition() + + case acked := <-b.ackedChan: + // A batch has been acknowledged. Mark it as done, remove its events + // from the queue buffer, and check if the queue position can be + // advanced. + l.handleBatchACK(acked) + l.maybeAdvanceBatchPosition() } } } -// handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig -// is run by the ackLoop. -func (l *ackLoop) handleBatchSig() int { - ackedBatches := l.collectAcked() +// Collect all contiguous acknowledged batches from the start of the +// pending list. +func (l *ackLoop) collectACKed() batchList { + ackedBatches := batchList{} + + for !l.pendingBatches.empty() { + batch := l.pendingBatches.front() + // This check is safe since only the ackLoop goroutine can modify + // "done" after the batch is created. + if !batch.done { + break + } + ackedBatches.append(l.pendingBatches.pop()) + } + return ackedBatches +} + +func (l *ackLoop) maybeAdvanceBatchPosition() { + ackedBatches := l.collectACKed() + if ackedBatches.empty() { + return + } count := 0 for batch := ackedBatches.front(); batch != nil; batch = batch.next { count += batch.count @@ -84,28 +108,15 @@ func (l *ackLoop) handleBatchSig() int { l.broker.logger.Debug("ackloop: return ack to broker loop:", count) l.broker.logger.Debug("ackloop: done send ack") - return count } -func (l *ackLoop) collectAcked() batchList { - ackedBatches := batchList{} - - acks := l.pendingBatches.pop() - ackedBatches.append(acks) - - done := false - for !l.pendingBatches.empty() && !done { - acks := l.pendingBatches.front() - select { - case <-acks.doneChan: - ackedBatches.append(l.pendingBatches.pop()) - - default: - done = true - } +func (l *ackLoop) handleBatchACK(b *batch) { + // Clear all event pointers so the memory can be reclaimed immediately. + for i := 0; i < b.count; i++ { + entry := b.rawEntry(i) + entry.event = nil } - - return ackedBatches + b.done = true } // Called by ackLoop. This function exists to decouple the work of collecting diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index e1d0fd46c004..93498adafae9 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -78,6 +78,12 @@ type broker struct { // through this channel so ackLoop can monitor them for acknowledgments. consumedChan chan batchList + // When a batch is acknowledged by a consumer, it is sent to this channel, + // where it is read by ackLoop. When the oldest remaining batches are + // acknowledged, ackLoop calls any appropriate acknowledgment callbacks + // and notifies runLoop to advance the queue position. + ackedChan chan *batch + // ackCallback is a configurable callback to invoke when ACKs are processed. // ackLoop calls this function when it advances the consumer ACK position. // Right now this forwards the notification to queueACKed() in @@ -86,8 +92,8 @@ type broker struct { // When batches are acknowledged, ackLoop saves any metadata needed // for producer callbacks and such, then notifies runLoop that it's - // safe to free these events and advance the queue by sending the - // acknowledged event count to this channel. + // safe to advance the queue position by sending the acknowledged + // event count to this channel. deleteChan chan int /////////////////////////////// @@ -129,9 +135,9 @@ type batch struct { // Position and length of the events within the queue buffer start, count int - // batch.Done() sends to doneChan, where ackLoop reads it and handles - // acknowledgment / cleanup. - doneChan chan batchDoneMsg + // Set to true when an acknowledgment for this batch is received. + // Only read/written by the ackLoop goroutine. + done bool } type batchList struct { @@ -222,6 +228,10 @@ func newQueue( // internal runLoop and ackLoop channels consumedChan: make(chan batchList), deleteChan: make(chan int), + // ackedChan is buffered so output workers don't block on acknowledgment + // if ackLoop is busy. (10 is probably more than we really need, but + // no harm in being safe.) + ackedChan: make(chan *batch, 10), ackCallback: ackCallback, } @@ -287,9 +297,7 @@ func (b *broker) Metrics() (queue.Metrics, error) { var batchPool = sync.Pool{ New: func() interface{} { - return &batch{ - doneChan: make(chan batchDoneMsg, 1), - } + return &batch{} }, } @@ -299,6 +307,7 @@ func newBatch(queue *broker, start, count int) *batch { batch.queue = queue batch.start = start batch.count = count + batch.done = false return batch } @@ -346,13 +355,6 @@ func (l *batchList) front() *batch { return l.head } -func (l *batchList) nextBatchChannel() chan batchDoneMsg { - if l.head == nil { - return nil - } - return l.head.doneChan -} - func (l *batchList) pop() *batch { ch := l.head if ch != nil { @@ -408,5 +410,5 @@ func (b *batch) FreeEntries() { } func (b *batch) Done() { - b.doneChan <- batchDoneMsg{} + b.queue.ackedChan <- b } diff --git a/libbeat/publisher/queue/memqueue/runloop.go b/libbeat/publisher/queue/memqueue/runloop.go index 0f7788c62098..c2cb90d62cea 100644 --- a/libbeat/publisher/queue/memqueue/runloop.go +++ b/libbeat/publisher/queue/memqueue/runloop.go @@ -187,13 +187,8 @@ func (l *runLoop) handleGetReply(req *getRequest) { } func (l *runLoop) handleDelete(count int) { - // Clear the internal event pointers so they can be garbage collected - for i := 0; i < count; i++ { - index := (l.bufPos + i) % len(l.broker.buf) - l.broker.buf[index].event = nil - } - - // Advance position and counters + // Advance position and counters. Event data has already been cleared + // by ackLoop. l.bufPos = (l.bufPos + count) % len(l.broker.buf) l.eventCount -= count l.consumedCount -= count