Skip to content

Commit

Permalink
Fixing data races
Browse files Browse the repository at this point in the history
  • Loading branch information
deniszh committed Jun 13, 2024
1 parent 22b02a4 commit f858fa1
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
26 changes: 16 additions & 10 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,10 +902,11 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
var t0 = time.Now()
var fidx = listener.CurrentFileIndex()
var files []string
var filesLen int
var filesMutex sync.Mutex
var filesLen atomic.Uint64
var details = make(map[string]*protov3.MetricDetails)
var trieIdx *trieIndex
var metricsKnown uint64
var metricsKnown atomic.Uint64
var infos []zap.Field
if listener.trieIndex {
if fidx == nil || !listener.concurrentIndex {
Expand All @@ -929,7 +930,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
files = append(files, fileName)
}
if strings.HasSuffix(fileName, ".wsp") {
metricsKnown++
metricsKnown.Add(1)
}
}
cacheIndexRuntime := time.Since(tcache)
Expand Down Expand Up @@ -974,9 +975,9 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
break
}

filesLen++
filesLen.Add(1)
if strings.HasSuffix(entry.Path, ".wsp") {
metricsKnown++
metricsKnown.Add(1)
}
}
if err := flc.Close(); err != nil {
Expand Down Expand Up @@ -1019,6 +1020,8 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
// numWorkers default in sane here (>=4, but <=32)
}
// please note that fastwalk.Walk function is not thread safe
// TODO: refactor this
// we can construct filesList in goroutines and update index from fileList cache singlethreaded
err := fastwalk.Walk(&fastwalkConf, dir, func(p string, d fs.DirEntry, err error) error {
if err != nil {
logger.Info("error processing", zap.String("path", p), zap.Error(err))
Expand Down Expand Up @@ -1070,7 +1073,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
isFullMetric := strings.HasSuffix(info.Name(), ".wsp")
if d.IsDir() || isFullMetric { // both dir and metric file is needed for supporting trigram index.
trimmedName := strings.TrimPrefix(p, listener.whisperData)
filesLen++
filesLen.Add(1)

var dataPoints, logicalSize, physicalSize int64
if isFullMetric {
Expand Down Expand Up @@ -1106,11 +1109,14 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
}
}
} else {
// we're in fastwalk goroutine
filesMutex.Lock()
defer filesMutex.Unlock()
files = append(files, trimmedName)
}

if isFullMetric {
metricsKnown++
metricsKnown.Add(1)
}
}

Expand Down Expand Up @@ -1173,7 +1179,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
totalSpace := stat.Blocks * uint64(stat.Bsize)

fileScanRuntime := time.Since(t0)
atomic.StoreUint64(&listener.metrics.MetricsKnown, metricsKnown)
atomic.StoreUint64(&listener.metrics.MetricsKnown, metricsKnown.Load())
atomic.AddUint64(&listener.metrics.FileScanTimeNS, uint64(fileScanRuntime.Nanoseconds()))

nfidx := &fileIndex{
Expand Down Expand Up @@ -1239,12 +1245,12 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName
zap.Duration("rdtime_update_runtime", rdTimeUpdateRuntime),
zap.Duration("cache_index_runtime", cacheIndexRuntime),
zap.Duration("total_runtime", time.Since(t0)),
zap.Int("Files", filesLen),
zap.Uint64("Files", filesLen.Load()),
zap.Int("index_size", indexSize),
zap.Int("pruned_trigrams", pruned),
zap.Int("cache_metric_len_before", cacheMetricLen),
zap.Int("cache_metric_len_after", len(cacheMetricNames)),
zap.Uint64("metrics_known", metricsKnown),
zap.Uint64("metrics_known", metricsKnown.Load()),
zap.String("index_type", indexType),
zap.Bool("read_from_cache", readFromCache),
)
Expand Down
3 changes: 3 additions & 0 deletions carbonserver/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ type trieIndex struct {
fileCount int
depth uint64
longestMetric string
mutex sync.Mutex

// qau: Quota And Usage
qauMetrics []points.Points
Expand Down Expand Up @@ -492,6 +493,8 @@ func (t *trieInsertError) Error() string { return t.typ }
//
// insert returns either a file node or dir node, after inserted.
func (ti *trieIndex) insert(path string, logicalSize, physicalSize, dataPoints, firstSeenAt int64) (*trieNode, error) {
ti.mutex.Lock()
defer ti.mutex.Unlock()
path = filepath.Clean(path)
if len(path) > 0 && path[0] == '/' { // skipcq: GO-S1005
path = path[1:]
Expand Down

0 comments on commit f858fa1

Please sign in to comment.