diff --git a/cache/cache.go b/cache/cache.go index 83722d466..ac3b9f0f5 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -15,19 +15,24 @@ import ( ) var ( - ErrTimeout = errors.New("cache: timeout") - ErrNotFound = errors.New("cache: not found") + ErrTimeout = errors.New("cache: timeout") + ErrNotFound = errors.New("cache: not found") + ErrAlreadySet = errors.New("cache: already set") ) type BytesCache interface { Get(k string) ([]byte, error) Set(k string, v []byte, expire int32) + Add(k string, v []byte, expire int32) error + Del(k string) } type NullCache struct{} -func (NullCache) Get(string) ([]byte, error) { return nil, ErrNotFound } -func (NullCache) Set(string, []byte, int32) {} +func (NullCache) Get(string) ([]byte, error) { return nil, ErrNotFound } +func (NullCache) Set(string, []byte, int32) {} +func (NullCache) Add(string, []byte, int32) error { return nil } +func (NullCache) Del(string) {} func NewExpireCache(maxsize uint64) BytesCache { ec := expirecache.New(maxsize) @@ -49,6 +54,10 @@ func (ec ExpireCache) Get(k string) ([]byte, error) { return v.([]byte), nil } +func (ec ExpireCache) Add(string, []byte, int32) error { return nil } + +func (ec ExpireCache) Del(string) {} + func (ec ExpireCache) Set(k string, v []byte, expire int32) { ec.ec.Set(k, v, uint64(len(v)), expire) } @@ -67,6 +76,22 @@ type MemcachedCache struct { timeouts uint64 } +func (m *MemcachedCache) Add(k string, v []byte, expire int32) error { + key := sha1.Sum([]byte(k)) + hk := hex.EncodeToString(key[:]) + if err := m.client.Add(&memcache.Item{Key: m.prefix + hk, Value: v, Expiration: expire}); err == memcache.ErrNotStored { + return ErrAlreadySet + } + + return nil +} + +func (m *MemcachedCache) Del(k string) { + key := sha1.Sum([]byte(k)) + hk := hex.EncodeToString(key[:]) + go m.client.Delete(m.prefix + hk) +} + func (m *MemcachedCache) Get(k string) ([]byte, error) { key := sha1.Sum([]byte(k)) hk := hex.EncodeToString(key[:]) diff --git a/cmd/carbonapi/config/config.go b/cmd/carbonapi/config/config.go index 5c5944415..84afbaf3e 100644 --- a/cmd/carbonapi/config/config.go +++ b/cmd/carbonapi/config/config.go @@ -23,10 +23,13 @@ var DefaultLoggerConfig = zapwriter.Config{ } type CacheConfig struct { - Type string `mapstructure:"type"` - Size int `mapstructure:"size_mb"` - MemcachedServers []string `mapstructure:"memcachedServers"` - DefaultTimeoutSec int32 `mapstructure:"defaultTimeoutSec"` + Type string `mapstructure:"type"` + Size int `mapstructure:"size_mb"` + MemcachedServers []string `mapstructure:"memcachedServers"` + DefaultTimeoutSec int32 `mapstructure:"defaultTimeoutSec"` + DogpileProtection bool `mapstructure:"dogpileProtection"` + DogpileProtectionLockTimeoutSec int32 `mapstructure:"dogpileProtectionLockTimeoutSec"` + DogpileProtectionRetryDelayMs int `mapstructure:"dogpileProtectionRetryDelayMs"` } type GraphiteConfig struct { diff --git a/cmd/carbonapi/config/init.go b/cmd/carbonapi/config/init.go index aca78b0e9..086b47fd5 100644 --- a/cmd/carbonapi/config/init.go +++ b/cmd/carbonapi/config/init.go @@ -159,6 +159,11 @@ func SetUpConfig(logger *zap.Logger, BuildVersion string) { Config.ResponseCache = createCache(logger, "cache", Config.ResponseCacheConfig) Config.BackendCache = createCache(logger, "backendCache", Config.BackendCacheConfig) + if Config.BackendCacheConfig.DogpileProtection && Config.BackendCacheConfig.Type != "memcache" { + // dogpile protection is only effective for memcache + Config.BackendCacheConfig.DogpileProtection = false + logger.Warn("dogpile protection is enabled in the config but the cache type is not memcache, this has no effect") + } if Config.TimezoneString != "" { fields := strings.Split(Config.TimezoneString, ",") @@ -310,6 +315,8 @@ func SetUpViper(logger *zap.Logger, configPath *string, viperPrefix string) { viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) viper.BindEnv("tz", "carbonapi_tz") viper.SetDefault("listen", "localhost:8081") + viper.SetDefault("backendCache.dogpileProtectionLockTimeoutSec", 60) + viper.SetDefault("backendCache.dogpileProtectionRetryDelayMs", 100) viper.SetDefault("concurency", 20) viper.SetDefault("cache.type", "mem") viper.SetDefault("cache.size_mb", 0) diff --git a/cmd/carbonapi/http/render_handler.go b/cmd/carbonapi/http/render_handler.go index 5f9fabe79..322b728b9 100644 --- a/cmd/carbonapi/http/render_handler.go +++ b/cmd/carbonapi/http/render_handler.go @@ -2,6 +2,7 @@ package http import ( "bytes" + "context" "encoding/gob" "errors" "fmt" @@ -11,6 +12,8 @@ import ( "strings" "time" + "github.com/go-graphite/carbonapi/cache" + "github.com/ansel1/merry" "github.com/go-graphite/carbonapi/carbonapipb" "github.com/go-graphite/carbonapi/cmd/carbonapi/config" @@ -243,14 +246,13 @@ func renderHandler(w http.ResponseWriter, r *http.Request) { } }() - errors := make(map[string]merry.Error) backendCacheKey := backendCacheComputeKey(from, until, targets) - results, err := backendCacheFetchResults(logger, useCache, backendCacheKey, accessLogDetails) - - if err != nil { + // storeFunc computes from query and stores the results in the backend cache + storeFunc := func() ([]*types.MetricData, map[string]merry.Error) { ApiMetrics.BackendCacheMisses.Add(1) - results = make([]*types.MetricData, 0) + results := make([]*types.MetricData, 0) + errors := make(map[string]merry.Error) values := make(map[parser.MetricRequest][]*types.MetricData) for _, target := range targets { @@ -259,7 +261,7 @@ func renderHandler(w http.ResponseWriter, r *http.Request) { msg := buildParseErrorString(target, e, err) setError(w, accessLogDetails, msg, http.StatusBadRequest) logAsError = true - return + return nil, nil } ApiMetrics.RenderRequests.Add(1) @@ -279,6 +281,23 @@ func renderHandler(w http.ResponseWriter, r *http.Request) { if len(errors) == 0 { backendCacheStoreResults(logger, backendCacheKey, results, backendCacheTimeout) } + return results, errors + } + + var results []*types.MetricData + var errors map[string]merry.Error + if !useCache { + results, errors = storeFunc() + } else if config.Config.BackendCacheConfig.DogpileProtection { + results, errors = backendCacheGetOrSetExclusive(r.Context(), logger, backendCacheKey, storeFunc) + } else { + results, err = backendCacheFetchResults(logger, useCache, backendCacheKey, accessLogDetails) + if err != nil { + results, errors = storeFunc() + } + } + if results == nil && err == nil { + return } size := 0 @@ -378,6 +397,70 @@ func backendCacheComputeKey(from, until string, targets []string) string { return backendCacheKey.String() } +// backendCacheGetOrSetExclusive tries to retrieves data from the cache +// when the data does not exist, it determines if the value must be computed locally, or if someone else is already computing this value in which case it waits for the value to be available until timeout +func backendCacheGetOrSetExclusive(ctx context.Context, logger *zap.Logger, key string, storeFunc func() ([]*types.MetricData, map[string]merry.Error)) ([]*types.MetricData, map[string]merry.Error) { + timeout := time.NewTicker(time.Duration(config.Config.BackendCacheConfig.DogpileProtectionLockTimeoutSec) * time.Second) + defer timeout.Stop() + + for { + // check the timeout + select { + case <-ctx.Done(): + return nil, nil + case <-timeout.C: + logger.Warn("Could not fetch backendCache data within lock timeout") + // we could not get data from cache within timeout, we compute the value and store in the cache + return storeFunc() + default: + } + + // try to fetch the value from cache + backendCacheResults, err := config.Config.BackendCache.Get(key) + // there is no cache, try to acquire a lock + if err == cache.ErrNotFound { + lockKey := key + " lock" + err := config.Config.BackendCache.Add(lockKey, []byte("lock"), config.Config.BackendCacheConfig.DogpileProtectionLockTimeoutSec) + // we could not acquire the lock + if err == cache.ErrAlreadySet { + // wait a little bit for someone else to compute the value + time.Sleep(time.Duration(config.Config.BackendCacheConfig.DogpileProtectionRetryDelayMs) * time.Millisecond) + continue + } + + // we have the lock + defer config.Config.BackendCache.Del(lockKey) + + // check again that the value was not set since our previous check + backendCacheResults, err = config.Config.BackendCache.Get(key) + // let's compute the value + if err == cache.ErrNotFound { + return storeFunc() + } + } + + if err != nil { + return storeFunc() + } + + // we have data in the cache + metrics, err := bytesToMetricsData(backendCacheResults) + if err != nil { + return storeFunc() + } + + return metrics, nil + } +} + +func bytesToMetricsData(data []byte) ([]*types.MetricData, error) { + var results []*types.MetricData + cacheDecodingBuf := bytes.NewBuffer(data) + dec := gob.NewDecoder(cacheDecodingBuf) + err := dec.Decode(&results) + return results, err +} + func backendCacheFetchResults(logger *zap.Logger, useCache bool, backendCacheKey string, accessLogDetails *carbonapipb.AccessLogDetails) ([]*types.MetricData, error) { if !useCache { return nil, errors.New("useCache is false") @@ -389,11 +472,7 @@ func backendCacheFetchResults(logger *zap.Logger, useCache bool, backendCacheKey return nil, err } - var results []*types.MetricData - cacheDecodingBuf := bytes.NewBuffer(backendCacheResults) - dec := gob.NewDecoder(cacheDecodingBuf) - err = dec.Decode(&results) - + results, err := bytesToMetricsData(backendCacheResults) if err != nil { logger.Error("Error decoding cached backend results") return nil, err diff --git a/doc/configuration.md b/doc/configuration.md index 7d1d7e422..561f7b117 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -236,9 +236,14 @@ cache: ## backendCache Specify what storage to use for backend cache. This cache stores the responses from the backends. It should have more cache hits than the response cache since -the response format and the maxDataPoints paramter are not part of the cache +the response format and the maxDataPoints parameter are not part of the cache key, but results from cache still need to be postprocessed (e.g. serialized to -desired response format). +desired response format). + +When using `memcache`, you can prevent [dogpile effect](https://en.wikipedia.org/wiki/Cache_stampede) with `dogpileProtection*` opt-in settings (disabled by default). +If enabled, multiple `carbonapi` threads or servers won't try to fetch the data from backends if the same request is already being computed somewhere else. +For instance, it's usefull when you have a bunch of servers, long running queries and multiple users accessing the same dashboards/queries. +Those settings have no effect outside of `memcache` cache backend. Supports same options as the response cache. ### Example @@ -250,6 +255,9 @@ backendCache: memcachedServers: - "127.0.0.1:1234" - "127.0.0.2:1235" + dogpileProtection: true # default is false + dogpileProtectionLockTimeoutSec: 60 # this is the default value. If the lock cannot be obtained within timeout, the cache is not used + dogpileProtectionRetryDelayMs: 100 # this is the default value. When someone else is computing the response, we try to get a cached response every dogpileProtectionRetryDelayMs ``` *** ## cpus