Skip to content

Commit

Permalink
Merge pull request #120 from saveweb/calming-down-queue-stats
Browse files Browse the repository at this point in the history
Calming down queue stats
  • Loading branch information
CorentinB authored Aug 6, 2024
2 parents b8fa426 + 9bdeb1d commit 9f798b1
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 72 deletions.
1 change: 1 addition & 0 deletions internal/pkg/crawl/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (w *Worker) Run() {
}
}

w.state.lastAction = "dequeueing item"
// Try to get an item from the queue or handover channel
item, err := w.pool.Crawl.Queue.Dequeue()
if err != nil {
Expand Down
32 changes: 24 additions & 8 deletions internal/pkg/queue/enqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ func TestEnqueue(t *testing.T) {
t.Fatalf("Expected UniqueHosts to be 1, got %d", q.GetStats().UniqueHosts)
}

if q.GetStats().ElementsPerHost["example.fr"] != 1 {
t.Fatalf("Expected ElementsPerHost[example.fr] to be 1, got %d", q.GetStats().ElementsPerHost["example.fr"])
ElementsPerHost := q.GetElementsPerHost()
q.statsMutex.Lock()
defer q.statsMutex.Unlock()

if (*ElementsPerHost)["example.fr"] != 1 {
t.Fatalf("Expected ElementsPerHost[example.fr] to be 1, got %d", (*ElementsPerHost)["example.fr"])
}
})

Expand Down Expand Up @@ -82,8 +86,12 @@ func TestEnqueue(t *testing.T) {
t.Fatalf("Expected UniqueHosts to be 3, got %d", q.GetStats().UniqueHosts)
}

if q.GetStats().ElementsPerHost["example.fr"] != 2 {
t.Fatalf("Expected ElementsPerHost[example.fr] to be 2, got %d", q.GetStats().ElementsPerHost["example.fr"])
ElementsPerHost := q.GetElementsPerHost()
q.statsMutex.Lock()
defer q.statsMutex.Unlock()

if (*ElementsPerHost)["example.fr"] != 2 {
t.Fatalf("Expected ElementsPerHost[example.fr] to be 2, got %d", (*ElementsPerHost)["example.fr"])
}

if q.Empty.Get() {
Expand Down Expand Up @@ -245,8 +253,12 @@ func TestBatchEnqueue(t *testing.T) {
t.Fatalf("Expected UniqueHosts to be 1, got %d", q.GetStats().UniqueHosts)
}

if q.GetStats().ElementsPerHost["example.fr"] != 1 {
t.Fatalf("Expected ElementsPerHost[example.fr] to be 1, got %d", q.GetStats().ElementsPerHost["example.fr"])
ElementsPerHost := q.GetElementsPerHost()
q.statsMutex.Lock()
defer q.statsMutex.Unlock()

if (*ElementsPerHost)["example.fr"] != 1 {
t.Fatalf("Expected ElementsPerHost[example.fr] to be 1, got %d", (*ElementsPerHost)["example.fr"])
}

if q.Empty.Get() {
Expand Down Expand Up @@ -293,8 +305,12 @@ func TestBatchEnqueue(t *testing.T) {
t.Fatalf("Expected UniqueHosts to be 3, got %d", q.GetStats().UniqueHosts)
}

if q.GetStats().ElementsPerHost["example.fr"] != 2 {
t.Fatalf("Expected ElementsPerHost[example.fr] to be 2, got %d", q.GetStats().ElementsPerHost["example.fr"])
ElementsPerHost := q.GetElementsPerHost()
q.statsMutex.Lock()
defer q.statsMutex.Unlock()

if (*ElementsPerHost)["example.fr"] != 2 {
t.Fatalf("Expected ElementsPerHost[example.fr] to be 2, got %d", (*ElementsPerHost)["example.fr"])
}

if q.Empty.Get() {
Expand Down
7 changes: 2 additions & 5 deletions internal/pkg/queue/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,8 @@ func (q *PersistentGroupedQueue) loadMetadata() error {
q.stats = &metadata.Stats

// Reinitialize maps if they're nil
if q.stats.ElementsPerHost == nil {
q.stats.ElementsPerHost = make(map[string]int)
}
if q.stats.HostDistribution == nil {
q.stats.HostDistribution = make(map[string]float64)
if q.stats.elementsPerHost == nil {
q.stats.elementsPerHost = make(map[string]int)
}

return nil
Expand Down
3 changes: 1 addition & 2 deletions internal/pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ func NewPersistentGroupedQueue(queueDirPath string, useHandover bool, useCommit
index: indexManager,
currentHost: new(atomic.Uint64),
stats: &QueueStats{
ElementsPerHost: make(map[string]int),
HostDistribution: make(map[string]float64),
elementsPerHost: make(map[string]int),
},
}

Expand Down
90 changes: 33 additions & 57 deletions internal/pkg/queue/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,74 +3,50 @@ package queue
import (
"encoding/json"
"os"
"sort"
"time"
)

type QueueStats struct {
FirstEnqueueTime time.Time `json:"first_enqueue_time"`
LastEnqueueTime time.Time `json:"last_enqueue_time"`
FirstDequeueTime time.Time `json:"first_dequeue_time"`
LastDequeueTime time.Time `json:"last_dequeue_time"`
ElementsPerHost map[string]int `json:"elements_per_host"`
HostDistribution map[string]float64 `json:"host_distribution"`
TopHosts []HostStat `json:"top_hosts"`
TotalElements int `json:"total_elements"`
UniqueHosts int `json:"unique_hosts"`
EnqueueCount int `json:"enqueue_count"`
DequeueCount int `json:"dequeue_count"`
AverageTimeBetweenEnqueues time.Duration `json:"average_time_between_enqueues"`
AverageTimeBetweenDequeues time.Duration `json:"average_time_between_dequeues"`
AverageElementsPerHost float64 `json:"average_elements_per_host"`
HandoverSuccessGetCount uint64 `json:"handover_success_get_count"`
}

type HostStat struct {
Host string `json:"host"`
Elements int `json:"elements"`
FirstEnqueueTime time.Time `json:"first_enqueue_time"`
LastEnqueueTime time.Time `json:"last_enqueue_time"`
FirstDequeueTime time.Time `json:"first_dequeue_time"`
LastDequeueTime time.Time `json:"last_dequeue_time"`
elementsPerHost map[string]int `json:"-"` // do not access it without locking statsMutex
TotalElements int `json:"total_elements"`
UniqueHosts int `json:"unique_hosts"`
EnqueueCount int `json:"enqueue_count"`
DequeueCount int `json:"dequeue_count"`
AverageTimeBetweenEnqueues time.Duration `json:"average_time_between_enqueues"`
AverageTimeBetweenDequeues time.Duration `json:"average_time_between_dequeues"`
AverageElementsPerHost float64 `json:"average_elements_per_host"`
HandoverSuccessGetCount uint64 `json:"handover_success_get_count"`
}

// generate and return the snapshot of the queue stats
// NOTE: elementsPerHost is not included in the snapshot
func (q *PersistentGroupedQueue) GetStats() QueueStats {
q.statsMutex.Lock()
defer q.statsMutex.Unlock()
q.genStats()
var snapshot QueueStats
// json.Marshal is used to create a deep copy of the stats
data, _ := json.Marshal(q.stats)
_ = json.Unmarshal(data, &snapshot)

return snapshot
// hack to avoid copying the map
elementsPerHost := q.stats.elementsPerHost
q.stats.elementsPerHost = nil
defer func() {
q.stats.elementsPerHost = elementsPerHost
}()

return *q.stats
}

// GetElementsPerHost is not thread-safe and should be called with the statsMutex locked
// If you real need to access elementsPerHost, you should lock the statsMutex
func (q *PersistentGroupedQueue) GetElementsPerHost() *map[string]int {
return &q.stats.elementsPerHost
}

// genStats is not thread-safe and should be called with the statsMutex locked
func (q *PersistentGroupedQueue) genStats() {
// Calculate top hosts
var topHosts []HostStat
for host, count := range q.stats.ElementsPerHost {
topHosts = append(topHosts, HostStat{Host: host, Elements: count})
}

// Sort topHosts by Elements in descending order
sort.Slice(topHosts, func(i, j int) bool {
return topHosts[i].Elements > topHosts[j].Elements
})

// Take top 10 or less
if len(topHosts) > 10 {
topHosts = topHosts[:10]
}
q.stats.TopHosts = topHosts

// Calculate host distribution
q.stats.HostDistribution = make(map[string]float64)
if q.stats.TotalElements > 0 {
for host, count := range q.stats.ElementsPerHost {
q.stats.HostDistribution[host] = float64(count) / float64(q.stats.TotalElements)
}
}

// Calculate additional q.Stats
if q.stats.UniqueHosts > 0 {
q.stats.AverageElementsPerHost = float64(q.stats.TotalElements) / float64(q.stats.UniqueHosts)
} else {
Expand Down Expand Up @@ -138,14 +114,14 @@ func (q *PersistentGroupedQueue) updateDequeueStats(host string) {
defer q.statsMutex.Unlock()

q.stats.TotalElements--
q.stats.ElementsPerHost[host]--
q.stats.elementsPerHost[host]--
if q.stats.DequeueCount == 0 {
q.stats.FirstDequeueTime = time.Now()
}
q.stats.DequeueCount++
q.stats.LastDequeueTime = time.Now()
if q.stats.ElementsPerHost[host] == 0 {
delete(q.stats.ElementsPerHost, host)
if q.stats.elementsPerHost[host] == 0 {
delete(q.stats.elementsPerHost, host)
q.stats.UniqueHosts--
}
}
Expand All @@ -155,10 +131,10 @@ func (q *PersistentGroupedQueue) updateEnqueueStats(item *Item) {
defer q.statsMutex.Unlock()

q.stats.TotalElements++
if q.stats.ElementsPerHost[item.URL.Host] == 0 {
if q.stats.elementsPerHost[item.URL.Host] == 0 {
q.stats.UniqueHosts++ // Increment UniqueHosts when we see a new host
}
q.stats.ElementsPerHost[item.URL.Host]++
q.stats.elementsPerHost[item.URL.Host]++
if q.stats.EnqueueCount == 0 {
q.stats.FirstEnqueueTime = time.Now()
}
Expand Down

0 comments on commit 9f798b1

Please sign in to comment.