-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreducer.go
84 lines (72 loc) · 2.13 KB
/
reducer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package main
import (
"database/sql"
"log"
"sync"
"time"
)
func skipCriteria(data *FSNodeStat, saveAllFiles bool) bool {
if saveAllFiles {
return false
}
return !data.IsDir && !data.IsSymlink && data.Size <= 200*1024*1024
}
func Reducer(db *sql.DB, bufferSize int, loggingInterval int, saveAllFiles bool, startTime time.Time, dataChan chan *FSNodeStat, end chan bool, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
totalCount := uint64(0)
totalSize := uint64(0)
lastCheckpointSize := uint64(0)
lastCheckpointTime := time.Now()
buffer := make([]*FSNodeStat, bufferSize)
count := 0
ending := false
for {
select {
case data := <-dataChan:
totalCount++
totalSize += data.SelfSize
if (totalCount)%uint64(loggingInterval) == 0 {
thisCheckpointTime := time.Now()
sizeIncrement := totalSize - lastCheckpointSize
timeIncrement := thisCheckpointTime.Sub(lastCheckpointTime)
timeElapsed := thisCheckpointTime.Sub(startTime)
currentSpeed := CalculateSpeed(sizeIncrement, timeIncrement.Seconds())
averageSpeed := CalculateSpeed(totalSize, timeElapsed.Seconds())
log.Printf("===== Checkpoint =====")
log.Printf("Data in queue: %v", len(dataChan))
log.Printf("Memory used: %s", toAppropriateUnit(GetSystemMemory()))
log.Printf("Current file node: %s", data.Path)
log.Printf("File nodes scanned: %d", totalCount)
log.Printf("Size scanned: %s", toAppropriateUnit(totalSize))
log.Printf("Time elapsed: %s", timeElapsed.String())
log.Printf("Current speed: %s/s", toAppropriateUnit(currentSpeed))
log.Printf("Average speed: %s/s", toAppropriateUnit(averageSpeed))
lastCheckpointSize = totalSize
lastCheckpointTime = thisCheckpointTime
}
if skipCriteria(data, saveAllFiles) {
continue
}
buffer[count] = data
count++
if count == bufferSize {
BatchInsertData(buffer[:count], db)
count = 0
}
case <-end:
ending = true
default:
if ending {
BatchInsertData(buffer[:count], db)
return
}
}
}
}
func CalculateSpeed(bytes uint64, duration float64) uint64 {
if duration == 0 {
return 0
}
return uint64(float64(bytes) / duration)
}