Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backend cache dogpile protection #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@ import (
)

var (
ErrTimeout = errors.New("cache: timeout")
ErrNotFound = errors.New("cache: not found")
ErrTimeout = errors.New("cache: timeout")
ErrNotFound = errors.New("cache: not found")
ErrAlreadySet = errors.New("cache: already set")
)

type BytesCache interface {
Get(k string) ([]byte, error)
Set(k string, v []byte, expire int32)
Add(k string, v []byte, expire int32) error
Del(k string)
}

type NullCache struct{}

func (NullCache) Get(string) ([]byte, error) { return nil, ErrNotFound }
func (NullCache) Set(string, []byte, int32) {}
func (NullCache) Get(string) ([]byte, error) { return nil, ErrNotFound }
func (NullCache) Set(string, []byte, int32) {}
func (NullCache) Add(string, []byte, int32) error { return nil }
func (NullCache) Del(string) {}

func NewExpireCache(maxsize uint64) BytesCache {
ec := expirecache.New(maxsize)
Expand All @@ -49,6 +54,10 @@ func (ec ExpireCache) Get(k string) ([]byte, error) {
return v.([]byte), nil
}

func (ec ExpireCache) Add(string, []byte, int32) error { return nil }

func (ec ExpireCache) Del(string) {}

func (ec ExpireCache) Set(k string, v []byte, expire int32) {
ec.ec.Set(k, v, uint64(len(v)), expire)
}
Expand All @@ -67,6 +76,22 @@ type MemcachedCache struct {
timeouts uint64
}

func (m *MemcachedCache) Add(k string, v []byte, expire int32) error {
key := sha1.Sum([]byte(k))
hk := hex.EncodeToString(key[:])
if err := m.client.Add(&memcache.Item{Key: m.prefix + hk, Value: v, Expiration: expire}); err == memcache.ErrNotStored {
return ErrAlreadySet
}

return nil
}

func (m *MemcachedCache) Del(k string) {
key := sha1.Sum([]byte(k))
hk := hex.EncodeToString(key[:])
go m.client.Delete(m.prefix + hk)
}

func (m *MemcachedCache) Get(k string) ([]byte, error) {
key := sha1.Sum([]byte(k))
hk := hex.EncodeToString(key[:])
Expand Down
11 changes: 7 additions & 4 deletions cmd/carbonapi/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ var DefaultLoggerConfig = zapwriter.Config{
}

type CacheConfig struct {
Type string `mapstructure:"type"`
Size int `mapstructure:"size_mb"`
MemcachedServers []string `mapstructure:"memcachedServers"`
DefaultTimeoutSec int32 `mapstructure:"defaultTimeoutSec"`
Type string `mapstructure:"type"`
Size int `mapstructure:"size_mb"`
MemcachedServers []string `mapstructure:"memcachedServers"`
DefaultTimeoutSec int32 `mapstructure:"defaultTimeoutSec"`
DogpileProtection bool `mapstructure:"dogpileProtection"`
DogpileProtectionLockTimeoutSec int32 `mapstructure:"dogpileProtectionLockTimeoutSec"`
DogpileProtectionRetryDelayMs int `mapstructure:"dogpileProtectionRetryDelayMs"`
}

type GraphiteConfig struct {
Expand Down
7 changes: 7 additions & 0 deletions cmd/carbonapi/config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ func SetUpConfig(logger *zap.Logger, BuildVersion string) {

Config.ResponseCache = createCache(logger, "cache", Config.ResponseCacheConfig)
Config.BackendCache = createCache(logger, "backendCache", Config.BackendCacheConfig)
if Config.BackendCacheConfig.DogpileProtection && Config.BackendCacheConfig.Type != "memcache" {
// dogpile protection is only effective for memcache
Config.BackendCacheConfig.DogpileProtection = false
logger.Warn("dogpile protection is enabled in the config but the cache type is not memcache, this has no effect")
}

if Config.TimezoneString != "" {
fields := strings.Split(Config.TimezoneString, ",")
Expand Down Expand Up @@ -310,6 +315,8 @@ func SetUpViper(logger *zap.Logger, configPath *string, viperPrefix string) {
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
viper.BindEnv("tz", "carbonapi_tz")
viper.SetDefault("listen", "localhost:8081")
viper.SetDefault("backendCache.dogpileProtectionLockTimeoutSec", 60)
viper.SetDefault("backendCache.dogpileProtectionRetryDelayMs", 100)
viper.SetDefault("concurency", 20)
viper.SetDefault("cache.type", "mem")
viper.SetDefault("cache.size_mb", 0)
Expand Down
101 changes: 90 additions & 11 deletions cmd/carbonapi/http/render_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"bytes"
"context"
"encoding/gob"
"errors"
"fmt"
Expand All @@ -11,6 +12,8 @@ import (
"strings"
"time"

"github.com/go-graphite/carbonapi/cache"

"github.com/ansel1/merry"
"github.com/go-graphite/carbonapi/carbonapipb"
"github.com/go-graphite/carbonapi/cmd/carbonapi/config"
Expand Down Expand Up @@ -243,14 +246,13 @@ func renderHandler(w http.ResponseWriter, r *http.Request) {
}
}()

errors := make(map[string]merry.Error)
backendCacheKey := backendCacheComputeKey(from, until, targets)
results, err := backendCacheFetchResults(logger, useCache, backendCacheKey, accessLogDetails)

if err != nil {
// storeFunc computes from query and stores the results in the backend cache
storeFunc := func() ([]*types.MetricData, map[string]merry.Error) {
ApiMetrics.BackendCacheMisses.Add(1)

results = make([]*types.MetricData, 0)
results := make([]*types.MetricData, 0)
errors := make(map[string]merry.Error)
values := make(map[parser.MetricRequest][]*types.MetricData)

for _, target := range targets {
Expand All @@ -259,7 +261,7 @@ func renderHandler(w http.ResponseWriter, r *http.Request) {
msg := buildParseErrorString(target, e, err)
setError(w, accessLogDetails, msg, http.StatusBadRequest)
logAsError = true
return
return nil, nil
}

ApiMetrics.RenderRequests.Add(1)
Expand All @@ -279,6 +281,23 @@ func renderHandler(w http.ResponseWriter, r *http.Request) {
if len(errors) == 0 {
backendCacheStoreResults(logger, backendCacheKey, results, backendCacheTimeout)
}
return results, errors
}

var results []*types.MetricData
var errors map[string]merry.Error
if !useCache {
results, errors = storeFunc()
} else if config.Config.BackendCacheConfig.DogpileProtection {
results, errors = backendCacheGetOrSetExclusive(r.Context(), logger, backendCacheKey, storeFunc)
} else {
results, err = backendCacheFetchResults(logger, useCache, backendCacheKey, accessLogDetails)
if err != nil {
results, errors = storeFunc()
}
}
if results == nil && err == nil {
return
}

size := 0
Expand Down Expand Up @@ -378,6 +397,70 @@ func backendCacheComputeKey(from, until string, targets []string) string {
return backendCacheKey.String()
}

// backendCacheGetOrSetExclusive tries to retrieves data from the cache
// when the data does not exist, it determines if the value must be computed locally, or if someone else is already computing this value in which case it waits for the value to be available until timeout
func backendCacheGetOrSetExclusive(ctx context.Context, logger *zap.Logger, key string, storeFunc func() ([]*types.MetricData, map[string]merry.Error)) ([]*types.MetricData, map[string]merry.Error) {
timeout := time.NewTicker(time.Duration(config.Config.BackendCacheConfig.DogpileProtectionLockTimeoutSec) * time.Second)
defer timeout.Stop()

for {
// check the timeout
select {
case <-ctx.Done():
return nil, nil
case <-timeout.C:
logger.Warn("Could not fetch backendCache data within lock timeout")
// we could not get data from cache within timeout, we compute the value and store in the cache
return storeFunc()
default:
}

// try to fetch the value from cache
backendCacheResults, err := config.Config.BackendCache.Get(key)
// there is no cache, try to acquire a lock
if err == cache.ErrNotFound {
lockKey := key + " lock"
err := config.Config.BackendCache.Add(lockKey, []byte("lock"), config.Config.BackendCacheConfig.DogpileProtectionLockTimeoutSec)
// we could not acquire the lock
if err == cache.ErrAlreadySet {
// wait a little bit for someone else to compute the value
time.Sleep(time.Duration(config.Config.BackendCacheConfig.DogpileProtectionRetryDelayMs) * time.Millisecond)
continue
}

// we have the lock
defer config.Config.BackendCache.Del(lockKey)

// check again that the value was not set since our previous check
backendCacheResults, err = config.Config.BackendCache.Get(key)
// let's compute the value
if err == cache.ErrNotFound {
return storeFunc()
}
}

if err != nil {
return storeFunc()
}

// we have data in the cache
metrics, err := bytesToMetricsData(backendCacheResults)
if err != nil {
return storeFunc()
}

return metrics, nil
}
}

func bytesToMetricsData(data []byte) ([]*types.MetricData, error) {
var results []*types.MetricData
cacheDecodingBuf := bytes.NewBuffer(data)
dec := gob.NewDecoder(cacheDecodingBuf)
err := dec.Decode(&results)
return results, err
}

func backendCacheFetchResults(logger *zap.Logger, useCache bool, backendCacheKey string, accessLogDetails *carbonapipb.AccessLogDetails) ([]*types.MetricData, error) {
if !useCache {
return nil, errors.New("useCache is false")
Expand All @@ -389,11 +472,7 @@ func backendCacheFetchResults(logger *zap.Logger, useCache bool, backendCacheKey
return nil, err
}

var results []*types.MetricData
cacheDecodingBuf := bytes.NewBuffer(backendCacheResults)
dec := gob.NewDecoder(cacheDecodingBuf)
err = dec.Decode(&results)

results, err := bytesToMetricsData(backendCacheResults)
if err != nil {
logger.Error("Error decoding cached backend results")
return nil, err
Expand Down
12 changes: 10 additions & 2 deletions doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,14 @@ cache:
## backendCache
Specify what storage to use for backend cache. This cache stores the responses
from the backends. It should have more cache hits than the response cache since
the response format and the maxDataPoints paramter are not part of the cache
the response format and the maxDataPoints parameter are not part of the cache
key, but results from cache still need to be postprocessed (e.g. serialized to
desired response format).
desired response format).

When using `memcache`, you can prevent [dogpile effect](https://en.wikipedia.org/wiki/Cache_stampede) with `dogpileProtection*` opt-in settings (disabled by default).
If enabled, multiple `carbonapi` threads or servers won't try to fetch the data from backends if the same request is already being computed somewhere else.
For instance, it's usefull when you have a bunch of servers, long running queries and multiple users accessing the same dashboards/queries.
Those settings have no effect outside of `memcache` cache backend.

Supports same options as the response cache.
### Example
Expand All @@ -250,6 +255,9 @@ backendCache:
memcachedServers:
- "127.0.0.1:1234"
- "127.0.0.2:1235"
dogpileProtection: true # default is false
dogpileProtectionLockTimeoutSec: 60 # this is the default value. If the lock cannot be obtained within timeout, the cache is not used
dogpileProtectionRetryDelayMs: 100 # this is the default value. When someone else is computing the response, we try to get a cached response every dogpileProtectionRetryDelayMs
```
***
## cpus
Expand Down