Skip to content

Commit

Permalink
Add: large scale test (doesn't pass yet)
Browse files Browse the repository at this point in the history
  • Loading branch information
CorentinB committed Jul 12, 2024
1 parent ea748ea commit 2da5d78
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 5 deletions.
2 changes: 2 additions & 0 deletions internal/pkg/crawl/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ func (c *Crawl) Start() (err error) {
c.Log.WithFields(c.genLogFields(nil, nil, log.Fields)).Warn(log.Message)
case logrus.InfoLevel:
c.Log.WithFields(c.genLogFields(nil, nil, log.Fields)).Info(log.Message)
case logrus.DebugLevel:
c.Log.WithFields(c.genLogFields(nil, nil, log.Fields)).Debug(log.Message)
}
}
}()
Expand Down
26 changes: 21 additions & 5 deletions internal/pkg/queue/dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package queue
import (
"fmt"
"io"

"github.com/sirupsen/logrus"
)

// Dequeue removes and returns the next item from the queue
Expand All @@ -16,22 +18,30 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) {
defer q.mutex.Unlock()

for len(q.hostOrder) == 0 {
q.cond.Wait() // This unlocks the mutex while waiting
q.LoggingChan <- &LogMessage{
Message: "Waiting for items to be enqueued",
Level: logrus.DebugLevel,
}
q.cond.Wait()
}

// Loop through hosts until we find one with items or we've checked all hosts
hostsChecked := 0
for hostsChecked < len(q.hostOrder) {
host := q.hostOrder[q.currentHost]
positions := q.hostIndex[host]

q.LoggingChan <- &LogMessage{
Message: fmt.Sprintf("Checking host %s, positions: %d", host, len(positions)),
Level: logrus.DebugLevel,
}

if len(positions) == 0 {
// Remove this host from the order and index
q.hostOrder = append(q.hostOrder[:q.currentHost], q.hostOrder[q.currentHost+1:]...)
delete(q.hostIndex, host)
if len(q.hostOrder) == 0 {
q.currentHost = 0
continue // This will cause the outer loop to check again
continue
}
q.currentHost = q.currentHost % len(q.hostOrder)
hostsChecked++
Expand All @@ -42,18 +52,22 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) {
position := positions[0]
q.hostIndex[host] = positions[1:]

// Seek to position and decode item
q.LoggingChan <- &LogMessage{
Message: fmt.Sprintf("Dequeuing item at position %d for host %s", position, host),
Level: logrus.DebugLevel,
}

_, err := q.queueFile.Seek(int64(position), io.SeekStart)
if err != nil {
return nil, fmt.Errorf("failed to seek to item position: %w", err)
}

var item Item
err = q.queueDecoder.Decode(&item)
if err != nil {
return nil, fmt.Errorf("failed to decode item: %w", err)
}

// Move to next host
q.currentHost = (q.currentHost + 1) % len(q.hostOrder)

// Update stats
Expand All @@ -67,6 +81,8 @@ func (q *PersistentGroupedQueue) Dequeue() (*Item, error) {
return &item, nil
}

fmt.Println("After Loop")

// If we've checked all hosts and found no items, loop back to wait again
return q.Dequeue()
}
4 changes: 4 additions & 0 deletions internal/pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ type Item struct {
BypassSeencheck string
}

func init() {
gob.Register(Item{})
}

func NewPersistentGroupedQueue(queueDirPath string, loggingChan chan *LogMessage) (*PersistentGroupedQueue, error) {
err := os.MkdirAll(queueDirPath, 0755)
if err != nil {
Expand Down
62 changes: 62 additions & 0 deletions internal/pkg/queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package queue

import (
"fmt"
"net/url"
"os"
"path"
"testing"
"time"
)

func TestNewItem(t *testing.T) {
Expand Down Expand Up @@ -192,3 +194,63 @@ func TestPersistentGroupedQueue_Close(t *testing.T) {
t.Errorf("Expected ErrQueueClosed on Enqueue after Close, got: %v", err)
}
}

func TestLargeScaleEnqueueDequeue(t *testing.T) {
// Increase test timeout
t.Parallel()

tempDir, err := os.MkdirTemp("", "queue_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)

queuePath := path.Join(tempDir, "test_queue")
loggingChan := make(chan *LogMessage, 10000)

q, err := NewPersistentGroupedQueue(queuePath, loggingChan)
if err != nil {
t.Fatalf("Failed to create new queue: %v", err)
}
defer q.Close()

numItems := 1000
hosts := []string{"example.com", "test.org", "sample.net", "demo.io"}

// Enqueue items
startEnqueue := time.Now()
for i := 0; i < numItems; i++ {
host := hosts[i%len(hosts)]
u, _ := url.Parse(fmt.Sprintf("https://%s/page%d", host, i))
item := NewItem(u, nil, "page", 1, fmt.Sprintf("id-%d", i), false)
err := q.Enqueue(item)
if err != nil {
t.Fatalf("Failed to enqueue item %d: %v", i, err)
}
}
enqueueTime := time.Since(startEnqueue)
t.Logf("Enqueue time for %d items: %v", numItems, enqueueTime)

// Dequeue items
startDequeue := time.Now()
dequeuedItems := make(map[string]bool)
for i := 0; i < numItems; i++ {
item, err := q.Dequeue()
if err != nil {
t.Fatalf("Failed to dequeue item %d: %v", i, err)
}
if item == nil {
t.Fatalf("Dequeued nil item at position %d", i)
}
if dequeuedItems[item.ID] {
t.Errorf("Item with ID %s dequeued more than once", item.ID)
}

dequeuedItems[item.ID] = true
}
dequeueTime := time.Since(startDequeue)

t.Logf("Dequeue time for %d items: %v", numItems, dequeueTime)
t.Logf("Average enqueue time per item: %v", enqueueTime/time.Duration(numItems))
t.Logf("Average dequeue time per item: %v", dequeueTime/time.Duration(numItems))
}

0 comments on commit 2da5d78

Please sign in to comment.