-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbloom.go
112 lines (98 loc) · 2.91 KB
/
bloom.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Copyright 2024 geebytes. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.
package golayeredcache
import (
"context"
"fmt"
"github.com/begonia-org/go-layered-cache/local"
"github.com/begonia-org/go-layered-cache/source"
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
)
type LayeredBloomFilter struct {
*BaseLayeredCacheImpl
log *logrus.Logger
keyPrefix string
}
func (lb *LayeredBloomFilter) OnMessage(ctx context.Context, from string, message interface{}) error {
XMessage, ok := message.(redis.XMessage)
if !ok {
return fmt.Errorf("type except redis.XMessage,but got %T", message)
}
values := XMessage.Values
if len(values) == 0 {
return nil
}
key, ok := values["key"].(string)
if !ok {
return fmt.Errorf("key is not string, got %T", values["key"])
}
value, ok := values["value"].(string)
if !ok {
return fmt.Errorf("value is not string, got %T", values["value"])
}
return lb.SetToLocal(ctx, key, []byte(value))
}
// onScan is a callback function for scan BF.SCANDUMP key iterator
func (lb *LayeredBloomFilter) onScan(ctx context.Context, key interface{}) error {
keyStr, ok := key.(string)
if !ok {
return fmt.Errorf("key is not string, got %T", key)
}
ch := lb.Dump(ctx, keyStr)
for v := range ch {
if err, ok := v.(error); ok {
lb.log.Errorf("dump:%v", err)
continue
}
if val, ok := v.(source.RedisDump); ok {
// lb.SetToLocal(ctx,keyStr,val)
lb.log.Infof("load %s", keyStr)
if err := lb.Load(ctx, keyStr, val); err != nil {
lb.log.Errorf("load:%v", err)
continue
}
}
}
return nil
}
func (lb *LayeredBloomFilter) LoadDump(ctx context.Context) error {
ch := lb.Scan(ctx, lb.keyPrefix, lb.onScan)
for err := range ch {
if err != nil {
lb.log.Error("scan", err)
}
}
return nil
}
func (lc *LayeredBloomFilter) Watch(ctx context.Context) <-chan error {
return lc.BaseLayeredCacheImpl.Watch(ctx, lc.OnMessage)
}
func (lc *LayeredBloomFilter) UnWatch() error {
return lc.BaseLayeredCacheImpl.UnWatch()
}
func newLayeredBloomFilter(layered *BaseLayeredCacheImpl, keyPrefix string, log *logrus.Logger) LayeredFilter {
return &LayeredBloomFilter{
BaseLayeredCacheImpl: layered,
keyPrefix: keyPrefix,
log: log,
}
}
func (lc *LayeredBloomFilter) Check(ctx context.Context, key string, value []byte) (bool, error) {
vals, err := lc.BaseLayeredCacheImpl.Get(ctx, key, value)
if err != nil {
lc.log.Errorf("check value of %s error:%v", key, err)
return false, err
}
if len(vals) == 0 {
return false, nil
}
return vals[0].(bool), nil
}
func (lc *LayeredBloomFilter) Add(ctx context.Context, key string, value []byte) error {
return lc.BaseLayeredCacheImpl.Set(ctx, key, value)
}
func (lc *LayeredBloomFilter) AddLocalFilter(key string, filter local.Filter) error {
return lc.BaseLayeredCacheImpl.local.(local.LocalFilters).AddFilter(key, filter)
}