Skip to content

Commit

Permalink
Add best-effort TTL support to the API (#40)
Browse files Browse the repository at this point in the history
* Updated the docs.

* Added TTL and request validation to the PUT contract classes.

* Added max_ttl_seconds to the config request limits.

* Added Request-based TTLs to the backends.

* Moved TTL-limiting into a backend decorator.

* Undid some unnecessary changes.

* Made aerospike use the configurable default  TTL.
  • Loading branch information
dbemiller authored Sep 24, 2018
1 parent 1fe5205 commit 486eb30
Show file tree
Hide file tree
Showing 19 changed files with 133 additions and 37 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ Adds one or more values to the cache. Values can be given as either JSON or XML.
"puts": [
{
"type": "xml",
"ttlseconds": 60,
"value": "<tag>Your XML content goes here.</tag>"
},
{
"type": "json",
"ttlseconds": 300,
"value": [1, true, "JSON value of any type can go here."]
}
]
Expand All @@ -29,6 +31,9 @@ If any of the `puts` are invalid, then it responds with a **400** none of the va
Assuming that all of the values are well-formed, then the server will respond with IDs which can be used to
fetch the values later.

**Note**: `ttlseconds` is optional, and will only be honored on a _best effort_ basis.
Callers should never _assume_ that the data will stay in the cache for that long.

```json
{
"responses": [
Expand Down
7 changes: 5 additions & 2 deletions backends/aerospike.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,19 @@ func (a *Aerospike) Get(ctx context.Context, key string) (string, error) {
return rec.Bins[binValue].(string), nil
}

func (a *Aerospike) Put(ctx context.Context, key string, value string) error {
func (a *Aerospike) Put(ctx context.Context, key string, value string, ttlSeconds int) error {
asKey, err := as.NewKey(a.cfg.Namespace, setName, key)
if err != nil {
return err
}
if ttlSeconds == 0 {
ttlSeconds = a.cfg.DefaultTTL
}
bins := as.BinMap{
binValue: value,
}
err = a.client.Put(&as.WritePolicy{
Expiration: uint32(a.cfg.DefaultTTL),
Expiration: uint32(ttlSeconds),
}, asKey, bins)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions backends/azure_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
"time"

"context"
"sync"

log "github.com/Sirupsen/logrus"
"github.com/valyala/fasthttp"
"sync"
)

type AzureValue struct {
Expand Down Expand Up @@ -143,7 +144,7 @@ func (c *AzureTableBackend) Get(ctx context.Context, key string) (string, error)
return av.Value, nil
}

func (c *AzureTableBackend) Put(ctx context.Context, key string, value string) error {
func (c *AzureTableBackend) Put(ctx context.Context, key string, value string, ttlSeconds int) error {

if key == "" {
return fmt.Errorf("Invalid Key")
Expand Down
2 changes: 1 addition & 1 deletion backends/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ import (

// Backend interface for storing data
type Backend interface {
Put(ctx context.Context, key string, value string) error
Put(ctx context.Context, key string, value string, ttlSeconds int) error
Get(ctx context.Context, key string) (string, error)
}
7 changes: 5 additions & 2 deletions backends/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@ func (c *Cassandra) Get(ctx context.Context, key string) (string, error) {
return res, err
}

func (c *Cassandra) Put(ctx context.Context, key string, value string) error {
err := c.session.Query(`INSERT INTO cache (key, value) VALUES (?, ?) USING TTL 2400`, key, value).
func (c *Cassandra) Put(ctx context.Context, key string, value string, ttlSeconds int) error {
if ttlSeconds == 0 {
ttlSeconds = 2400
}
err := c.session.Query(`INSERT INTO cache (key, value) VALUES (?, ?) USING TTL ?`, key, value, ttlSeconds).
WithContext(ctx).
Exec()

Expand Down
1 change: 1 addition & 0 deletions backends/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

func NewBackend(cfg config.Configuration, appMetrics *metrics.Metrics) backends.Backend {
backend := newBaseBackend(cfg.Backend)
backend = decorators.LimitTTLs(backend, cfg.RequestLimits.MaxTTLSeconds)
if cfg.RequestLimits.MaxSize > 0 {
backend = decorators.EnforceSizeLimit(backend, cfg.RequestLimits.MaxSize)
}
Expand Down
27 changes: 27 additions & 0 deletions backends/decorators/limit_ttls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package decorators

import (
"context"

"github.com/prebid/prebid-cache/backends"
)

// LimitTTLs wraps the delegate and makes sure that it never gets TTLs which exceed the max.
func LimitTTLs(delegate backends.Backend, maxTTLSeconds int) backends.Backend {
return ttlLimited{
Backend: delegate,
maxTTLSeconds: maxTTLSeconds,
}
}

type ttlLimited struct {
backends.Backend
maxTTLSeconds int
}

func (l ttlLimited) Put(ctx context.Context, key string, value string, ttlSeconds int) error {
if l.maxTTLSeconds > ttlSeconds {
return l.Backend.Put(ctx, key, value, ttlSeconds)
}
return l.Backend.Put(ctx, key, value, l.maxTTLSeconds)
}
39 changes: 39 additions & 0 deletions backends/decorators/limit_ttls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package decorators_test

import (
"context"
"testing"

"github.com/prebid/prebid-cache/backends/decorators"
)

func TestExcessiveTTL(t *testing.T) {
delegate := &ttlCapturer{}
wrapped := decorators.LimitTTLs(delegate, 100)
wrapped.Put(context.Background(), "foo", "bar", 200)
if delegate.lastTTL != 100 {
t.Errorf("lastTTL should be %d. Got %d", 100, delegate.lastTTL)
}
}

func TestSafeTTL(t *testing.T) {
delegate := &ttlCapturer{}
wrapped := decorators.LimitTTLs(delegate, 100)
wrapped.Put(context.Background(), "foo", "bar", 50)
if delegate.lastTTL != 50 {
t.Errorf("lastTTL should be %d. Got %d", 50, delegate.lastTTL)
}
}

type ttlCapturer struct {
lastTTL int
}

func (c *ttlCapturer) Put(ctx context.Context, key string, value string, ttlSeconds int) error {
c.lastTTL = ttlSeconds
return nil
}

func (c *ttlCapturer) Get(ctx context.Context, key string) (string, error) {
return "", nil
}
4 changes: 2 additions & 2 deletions backends/decorators/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (b *backendWithMetrics) Get(ctx context.Context, key string) (string, error
return val, err
}

func (b *backendWithMetrics) Put(ctx context.Context, key string, value string) error {
func (b *backendWithMetrics) Put(ctx context.Context, key string, value string, ttlSeconds int) error {
if strings.HasPrefix(value, backends.XML_PREFIX) {
b.puts.XmlRequest.Mark(1)
} else if strings.HasPrefix(value, backends.JSON_PREFIX) {
Expand All @@ -36,7 +36,7 @@ func (b *backendWithMetrics) Put(ctx context.Context, key string, value string)
b.puts.InvalidRequest.Mark(1)
}
start := time.Now()
err := b.delegate.Put(ctx, key, value)
err := b.delegate.Put(ctx, key, value, ttlSeconds)
if err == nil {
b.puts.Duration.UpdateSince(start)
} else {
Expand Down
14 changes: 7 additions & 7 deletions backends/decorators/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ func (b *failedBackend) Get(ctx context.Context, key string) (string, error) {
return "", fmt.Errorf("Failure")
}

func (b *failedBackend) Put(ctx context.Context, key string, value string) error {
func (b *failedBackend) Put(ctx context.Context, key string, value string, ttlSeconds int) error {
return fmt.Errorf("Failure")
}

func TestGetSuccessMetrics(t *testing.T) {
m := metrics.CreateMetrics()
rawBackend := backends.NewMemoryBackend()
rawBackend.Put(context.Background(), "foo", "xml<vast></vast>")
rawBackend.Put(context.Background(), "foo", "xml<vast></vast>", 0)
backend := LogMetrics(rawBackend, m)
backend.Get(context.Background(), "foo")

Expand All @@ -41,7 +41,7 @@ func TestGetErrorMetrics(t *testing.T) {
func TestPutSuccessMetrics(t *testing.T) {
m := metrics.CreateMetrics()
backend := LogMetrics(backends.NewMemoryBackend(), m)
backend.Put(context.Background(), "foo", "xml<vast></vast>")
backend.Put(context.Background(), "foo", "xml<vast></vast>", 0)

assertSuccessMetricsExist(t, m.PutsBackend)
if m.PutsBackend.XmlRequest.Count() != 1 {
Expand All @@ -52,7 +52,7 @@ func TestPutSuccessMetrics(t *testing.T) {
func TestPutErrorMetrics(t *testing.T) {
m := metrics.CreateMetrics()
backend := LogMetrics(&failedBackend{}, m)
backend.Put(context.Background(), "foo", "xml<vast></vast>")
backend.Put(context.Background(), "foo", "xml<vast></vast>", 0)

assertErrorMetricsExist(t, m.PutsBackend)
if m.PutsBackend.XmlRequest.Count() != 1 {
Expand All @@ -63,7 +63,7 @@ func TestPutErrorMetrics(t *testing.T) {
func TestJsonPayloadMetrics(t *testing.T) {
m := metrics.CreateMetrics()
backend := LogMetrics(backends.NewMemoryBackend(), m)
backend.Put(context.Background(), "foo", "json{\"key\":\"value\"")
backend.Put(context.Background(), "foo", "json{\"key\":\"value\"", 0)
backend.Get(context.Background(), "foo")

if m.PutsBackend.JsonRequest.Count() != 1 {
Expand All @@ -75,7 +75,7 @@ func TestPutSizeSampling(t *testing.T) {
m := metrics.CreateMetrics()
payload := `json{"key":"value"}`
backend := LogMetrics(backends.NewMemoryBackend(), m)
backend.Put(context.Background(), "foo", payload)
backend.Put(context.Background(), "foo", payload, 0)

if m.PutsBackend.RequestLength.Count() != 1 {
t.Errorf("A request size sample should have been logged.")
Expand All @@ -85,7 +85,7 @@ func TestPutSizeSampling(t *testing.T) {
func TestInvalidPayloadMetrics(t *testing.T) {
m := metrics.CreateMetrics()
backend := LogMetrics(backends.NewMemoryBackend(), m)
backend.Put(context.Background(), "foo", "bar")
backend.Put(context.Background(), "foo", "bar", 0)
backend.Get(context.Background(), "foo")

if m.PutsBackend.InvalidRequest.Count() != 1 {
Expand Down
4 changes: 2 additions & 2 deletions backends/decorators/size_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (b *sizeCappedBackend) Get(ctx context.Context, key string) (string, error)
return b.delegate.Get(ctx, key)
}

func (b *sizeCappedBackend) Put(ctx context.Context, key string, value string) error {
func (b *sizeCappedBackend) Put(ctx context.Context, key string, value string, ttlSeconds int) error {
valueLen := len(value)
if valueLen == 0 || valueLen > b.limit {
return &BadPayloadSize{
Expand All @@ -34,7 +34,7 @@ func (b *sizeCappedBackend) Put(ctx context.Context, key string, value string) e
}
}

return b.delegate.Put(ctx, key, value)
return b.delegate.Put(ctx, key, value, ttlSeconds)
}

type BadPayloadSize struct {
Expand Down
6 changes: 3 additions & 3 deletions backends/decorators/size_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
func TestLargePayload(t *testing.T) {
delegate := &successfulBackend{}
wrapped := EnforceSizeLimit(delegate, 5)
assertBadPayloadError(t, wrapped.Put(context.Background(), "foo", "123456"))
assertBadPayloadError(t, wrapped.Put(context.Background(), "foo", "123456", 0))
}

func TestAcceptablePayload(t *testing.T) {
delegate := &successfulBackend{}
wrapped := EnforceSizeLimit(delegate, 5)
assertNilError(t, wrapped.Put(context.Background(), "foo", "12345"))
assertNilError(t, wrapped.Put(context.Background(), "foo", "12345", 0))
}

func assertBadPayloadError(t *testing.T, err error) {
Expand Down Expand Up @@ -42,6 +42,6 @@ func (b *successfulBackend) Get(ctx context.Context, key string) (string, error)
return "some-value", nil
}

func (b *successfulBackend) Put(ctx context.Context, key string, value string) error {
func (b *successfulBackend) Put(ctx context.Context, key string, value string, ttlSeconds int) error {
return nil
}
8 changes: 6 additions & 2 deletions backends/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ func (mc *Memcache) Get(ctx context.Context, key string) (string, error) {
return string(res.Value), nil
}

func (mc *Memcache) Put(ctx context.Context, key string, value string) error {
err := mc.client.Set(&memcache.Item{Key: key, Value: []byte(value)})
func (mc *Memcache) Put(ctx context.Context, key string, value string, ttlSeconds int) error {
err := mc.client.Set(&memcache.Item{
Expiration: int32(ttlSeconds),
Key: key,
Value: []byte(value),
})

if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion backends/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (b *MemoryBackend) Get(ctx context.Context, key string) (string, error) {
return v, nil
}

func (b *MemoryBackend) Put(ctx context.Context, key string, value string) error {
func (b *MemoryBackend) Put(ctx context.Context, key string, value string, ttlSeconds int) error {
b.db[key] = value
return nil
}
Expand Down
12 changes: 8 additions & 4 deletions backends/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package backends

import (
"context"
"strconv"
"time"

log "github.com/Sirupsen/logrus"
"github.com/go-redis/redis"
"github.com/prebid/prebid-cache/config"
"strconv"
"time"
)

type Redis struct {
Expand Down Expand Up @@ -46,8 +47,11 @@ func (redis *Redis) Get(ctx context.Context, key string) (string, error) {
return string(res), nil
}

func (redis *Redis) Put(ctx context.Context, key string, value string) error {
err := redis.client.Set(key, value, time.Duration(redis.cfg.Expiration)*time.Minute).Err()
func (redis *Redis) Put(ctx context.Context, key string, value string, ttlSeconds int) error {
if ttlSeconds == 0 {
ttlSeconds = redis.cfg.Expiration * 60
}
err := redis.client.Set(key, value, time.Duration(ttlSeconds)*time.Second).Err()

if err != nil {
return err
Expand Down
9 changes: 5 additions & 4 deletions compression/snappy.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package compression

import (
"github.com/prebid/prebid-cache/backends"
"context"

"github.com/golang/snappy"
"github.com/prebid/prebid-cache/backends"
)

// SnappyCompress runs snappy compression on data before saving it in the backend.
Expand All @@ -14,12 +15,12 @@ func SnappyCompress(backend backends.Backend) backends.Backend {
}
}

type snappyCompressor struct{
type snappyCompressor struct {
delegate backends.Backend
}

func (s *snappyCompressor) Put(ctx context.Context, key string, value string) error {
return s.delegate.Put(ctx, key, string(snappy.Encode(nil, []byte(value))))
func (s *snappyCompressor) Put(ctx context.Context, key string, value string, ttlSeconds int) error {
return s.delegate.Put(ctx, key, string(snappy.Encode(nil, []byte(value))), ttlSeconds)
}

func (s *snappyCompressor) Get(ctx context.Context, key string) (string, error) {
Expand Down
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ func (cfg *RateLimiting) validateAndLog() {
}

type RequestLimits struct {
MaxSize int `mapstructure:"max_size_bytes"`
MaxNumValues int `mapstructure:"max_num_values"`
MaxSize int `mapstructure:"max_size_bytes"`
MaxNumValues int `mapstructure:"max_num_values"`
MaxTTLSeconds int `mapstructure:"max_ttl_seconds"`
}

func (cfg *RequestLimits) validateAndLog() {
Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestSampleConfig(t *testing.T) {
assertInt64sEqual(t, "rate_limiter.num_requests", cfg.RateLimiting.MaxRequestsPerSecond, 100)
assertIntsEqual(t, "request_limits.max_size_bytes", cfg.RequestLimits.MaxSize, 10240)
assertIntsEqual(t, "request_limits.max_num_values", cfg.RequestLimits.MaxNumValues, 10)
assertIntsEqual(t, "request_limits.max_ttl_seconds", cfg.RequestLimits.MaxTTLSeconds, 5000)
assertStringsEqual(t, "backend.type", string(cfg.Backend.Type), "memory")
assertIntsEqual(t, "backend.aerospike.default_ttl_seconds", cfg.Backend.Aerospike.DefaultTTL, 3600)
assertStringsEqual(t, "backend.aerospike.host", cfg.Backend.Aerospike.Host, "aerospike.prebid.com")
Expand Down Expand Up @@ -86,6 +87,7 @@ rate_limiter:
request_limits:
max_size_bytes: 10240
max_num_values: 10
max_ttl_seconds: 5000
backend:
type: "memory"
aerospike:
Expand Down
Loading

0 comments on commit 486eb30

Please sign in to comment.