Skip to content

Commit

Permalink
Restore previous version of Dequeue
Browse files Browse the repository at this point in the history
  • Loading branch information
CorentinB committed Jul 12, 2024
1 parent 2da5d78 commit 61b7b9c
Showing 1 changed file with 5 additions and 21 deletions.
26 changes: 5 additions & 21 deletions internal/pkg/queue/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package queue
import (
"fmt"
"io"

"github.com/sirupsen/logrus"
)

// Dequeue removes and returns the next item from the queue
Expand All @@ -18,30 +16,22 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) {
defer q.mutex.Unlock()

for len(q.hostOrder) == 0 {
q.LoggingChan <- &LogMessage{
Message: "Waiting for items to be enqueued",
Level: logrus.DebugLevel,
}
q.cond.Wait()
q.cond.Wait() // This unlocks the mutex while waiting
}

// Loop through hosts until we find one with items or we've checked all hosts
hostsChecked := 0
for hostsChecked < len(q.hostOrder) {
host := q.hostOrder[q.currentHost]
positions := q.hostIndex[host]

q.LoggingChan <- &LogMessage{
Message: fmt.Sprintf("Checking host %s, positions: %d", host, len(positions)),
Level: logrus.DebugLevel,
}

if len(positions) == 0 {
// Remove this host from the order and index
q.hostOrder = append(q.hostOrder[:q.currentHost], q.hostOrder[q.currentHost+1:]...)
delete(q.hostIndex, host)
if len(q.hostOrder) == 0 {
q.currentHost = 0
continue
continue // This will cause the outer loop to check again
}
q.currentHost = q.currentHost % len(q.hostOrder)
hostsChecked++
Expand All @@ -52,22 +42,18 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) {
position := positions[0]
q.hostIndex[host] = positions[1:]

q.LoggingChan <- &LogMessage{
Message: fmt.Sprintf("Dequeuing item at position %d for host %s", position, host),
Level: logrus.DebugLevel,
}

// Seek to position and decode item
_, err := q.queueFile.Seek(int64(position), io.SeekStart)
if err != nil {
return nil, fmt.Errorf("failed to seek to item position: %w", err)
}

var item Item
err = q.queueDecoder.Decode(&item)
if err != nil {
return nil, fmt.Errorf("failed to decode item: %w", err)
}

// Move to next host
q.currentHost = (q.currentHost + 1) % len(q.hostOrder)

// Update stats
Expand All @@ -81,8 +67,6 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) {
return &item, nil
}

fmt.Println("After Loop")

// If we've checked all hosts and found no items, loop back to wait again
return q.Dequeue()
}

0 comments on commit 61b7b9c

Please sign in to comment.