From f858fa1e7c30f8c201fc703cdf90b702c678752b Mon Sep 17 00:00:00 2001 From: Denys Zhdanov Date: Thu, 13 Jun 2024 14:32:43 +0200 Subject: [PATCH] Fixing data races --- carbonserver/carbonserver.go | 26 ++++++++++++++++---------- carbonserver/trie.go | 3 +++ 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/carbonserver/carbonserver.go b/carbonserver/carbonserver.go index 7bd862273..31b897f12 100644 --- a/carbonserver/carbonserver.go +++ b/carbonserver/carbonserver.go @@ -902,10 +902,11 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName var t0 = time.Now() var fidx = listener.CurrentFileIndex() var files []string - var filesLen int + var filesMutex sync.Mutex + var filesLen atomic.Uint64 var details = make(map[string]*protov3.MetricDetails) var trieIdx *trieIndex - var metricsKnown uint64 + var metricsKnown atomic.Uint64 var infos []zap.Field if listener.trieIndex { if fidx == nil || !listener.concurrentIndex { @@ -929,7 +930,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName files = append(files, fileName) } if strings.HasSuffix(fileName, ".wsp") { - metricsKnown++ + metricsKnown.Add(1) } } cacheIndexRuntime := time.Since(tcache) @@ -974,9 +975,9 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName break } - filesLen++ + filesLen.Add(1) if strings.HasSuffix(entry.Path, ".wsp") { - metricsKnown++ + metricsKnown.Add(1) } } if err := flc.Close(); err != nil { @@ -1019,6 +1020,8 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName // numWorkers default in sane here (>=4, but <=32) } // please note that fastwalk.Walk function is not thread safe + // TODO: refactor this + // we can construct filesList in goroutines and update index from fileList cache singlethreaded err := fastwalk.Walk(&fastwalkConf, dir, func(p string, d fs.DirEntry, err error) error { if err != nil { logger.Info("error processing", zap.String("path", p), zap.Error(err)) @@ -1070,7 +1073,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName isFullMetric := strings.HasSuffix(info.Name(), ".wsp") if d.IsDir() || isFullMetric { // both dir and metric file is needed for supporting trigram index. trimmedName := strings.TrimPrefix(p, listener.whisperData) - filesLen++ + filesLen.Add(1) var dataPoints, logicalSize, physicalSize int64 if isFullMetric { @@ -1106,11 +1109,14 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName } } } else { + // we're in fastwalk goroutine + filesMutex.Lock() + defer filesMutex.Unlock() files = append(files, trimmedName) } if isFullMetric { - metricsKnown++ + metricsKnown.Add(1) } } @@ -1173,7 +1179,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName totalSpace := stat.Blocks * uint64(stat.Bsize) fileScanRuntime := time.Since(t0) - atomic.StoreUint64(&listener.metrics.MetricsKnown, metricsKnown) + atomic.StoreUint64(&listener.metrics.MetricsKnown, metricsKnown.Load()) atomic.AddUint64(&listener.metrics.FileScanTimeNS, uint64(fileScanRuntime.Nanoseconds())) nfidx := &fileIndex{ @@ -1239,12 +1245,12 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName zap.Duration("rdtime_update_runtime", rdTimeUpdateRuntime), zap.Duration("cache_index_runtime", cacheIndexRuntime), zap.Duration("total_runtime", time.Since(t0)), - zap.Int("Files", filesLen), + zap.Uint64("Files", filesLen.Load()), zap.Int("index_size", indexSize), zap.Int("pruned_trigrams", pruned), zap.Int("cache_metric_len_before", cacheMetricLen), zap.Int("cache_metric_len_after", len(cacheMetricNames)), - zap.Uint64("metrics_known", metricsKnown), + zap.Uint64("metrics_known", metricsKnown.Load()), zap.String("index_type", indexType), zap.Bool("read_from_cache", readFromCache), ) diff --git a/carbonserver/trie.go b/carbonserver/trie.go index 8bfad2bed..07c49c178 100644 --- a/carbonserver/trie.go +++ b/carbonserver/trie.go @@ -292,6 +292,7 @@ type trieIndex struct { fileCount int depth uint64 longestMetric string + mutex sync.Mutex // qau: Quota And Usage qauMetrics []points.Points @@ -492,6 +493,8 @@ func (t *trieInsertError) Error() string { return t.typ } // // insert returns either a file node or dir node, after inserted. func (ti *trieIndex) insert(path string, logicalSize, physicalSize, dataPoints, firstSeenAt int64) (*trieNode, error) { + ti.mutex.Lock() + defer ti.mutex.Unlock() path = filepath.Clean(path) if len(path) > 0 && path[0] == '/' { // skipcq: GO-S1005 path = path[1:]