-
Notifications
You must be signed in to change notification settings - Fork 182
/
Copy pathjson.go
199 lines (179 loc) · 7.15 KB
/
json.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package om
import (
"context"
"encoding/json"
"reflect"
"strconv"
"strings"
"time"
"github.com/oklog/ulid/v2"
"github.com/redis/rueidis"
)
// NewJSONRepository creates an JSONRepository.
// The prefix parameter is used as redis key prefix. The entity stored by the repository will be named in the form of `{prefix}:{id}`
// The schema parameter should be a struct with fields tagged with `redis:",key"` and `redis:",ver"`
func NewJSONRepository[T any](prefix string, schema T, client rueidis.Client, opts ...RepositoryOption) Repository[T] {
repo := &JSONRepository[T]{
prefix: prefix,
idx: "jsonidx:" + prefix,
typ: reflect.TypeOf(schema),
client: client,
}
repo.schema = newSchema(repo.typ)
for _, opt := range opts {
opt((*JSONRepository[any])(repo))
}
return repo
}
var _ Repository[any] = (*JSONRepository[any])(nil)
// JSONRepository is an OM repository backed by RedisJSON.
type JSONRepository[T any] struct {
schema schema
typ reflect.Type
client rueidis.Client
prefix string
idx string
}
// NewEntity returns an empty entity and will have the `redis:",key"` field be set with ULID automatically.
func (r *JSONRepository[T]) NewEntity() *T {
var v T
reflect.ValueOf(&v).Elem().Field(r.schema.key.idx).Set(reflect.ValueOf(ulid.Make().String()))
return &v
}
// Fetch an entity whose name is `{prefix}:{id}`
func (r *JSONRepository[T]) Fetch(ctx context.Context, id string) (v *T, err error) {
record, err := r.client.Do(ctx, r.client.B().JsonGet().Key(key(r.prefix, id)).Path(".").Build()).ToString()
if err == nil {
v, err = r.decode(record)
}
return v, err
}
// FetchCache is like Fetch, but it uses client side caching mechanism.
func (r *JSONRepository[T]) FetchCache(ctx context.Context, id string, ttl time.Duration) (v *T, err error) {
record, err := r.client.DoCache(ctx, r.client.B().JsonGet().Key(key(r.prefix, id)).Path(".").Cache(), ttl).ToString()
if err == nil {
v, err = r.decode(record)
}
return v, err
}
func (r *JSONRepository[T]) decode(record string) (*T, error) {
var v T
if err := json.NewDecoder(strings.NewReader(record)).Decode(&v); err != nil {
return nil, err
}
return &v, nil
}
func (r *JSONRepository[T]) toExec(entity *T) (verf reflect.Value, exec rueidis.LuaExec) {
val := reflect.ValueOf(entity).Elem()
verf = val.Field(r.schema.ver.idx)
extVal := int64(0)
if r.schema.ext != nil {
if ext, ok := val.Field(r.schema.ext.idx).Interface().(time.Time); ok && !ext.IsZero() {
extVal = ext.UnixMilli()
}
}
exec.Keys = []string{key(r.prefix, val.Field(r.schema.key.idx).String())}
if extVal != 0 {
exec.Args = []string{r.schema.ver.name, strconv.FormatInt(verf.Int(), 10), rueidis.JSON(entity), strconv.FormatInt(extVal, 10)}
} else {
exec.Args = []string{r.schema.ver.name, strconv.FormatInt(verf.Int(), 10), rueidis.JSON(entity)}
}
return
}
// Save the entity under the redis key of `{prefix}:{id}`.
// It also uses the `redis:",ver"` field and lua script to perform optimistic locking and prevent lost update.
func (r *JSONRepository[T]) Save(ctx context.Context, entity *T) (err error) {
valf, exec := r.toExec(entity)
str, err := jsonSaveScript.Exec(ctx, r.client, exec.Keys, exec.Args).ToString()
if rueidis.IsRedisNil(err) {
return ErrVersionMismatch
}
if err == nil {
ver, _ := strconv.ParseInt(str, 10, 64)
valf.SetInt(ver)
}
return err
}
// SaveMulti batches multiple HashRepository.Save at once
func (r *JSONRepository[T]) SaveMulti(ctx context.Context, entities ...*T) []error {
errs := make([]error, len(entities))
valf := make([]reflect.Value, len(entities))
exec := make([]rueidis.LuaExec, len(entities))
for i, entity := range entities {
valf[i], exec[i] = r.toExec(entity)
}
for i, resp := range jsonSaveScript.ExecMulti(ctx, r.client, exec...) {
if str, err := resp.ToString(); err != nil {
if errs[i] = err; rueidis.IsRedisNil(err) {
errs[i] = ErrVersionMismatch
}
} else {
ver, _ := strconv.ParseInt(str, 10, 64)
valf[i].SetInt(ver)
}
}
return errs
}
// Remove the entity under the redis key of `{prefix}:{id}`.
func (r *JSONRepository[T]) Remove(ctx context.Context, id string) error {
return r.client.Do(ctx, r.client.B().Del().Key(key(r.prefix, id)).Build()).Error()
}
// AlterIndex uses FT.ALTER from the RediSearch module to alter index under the name `jsonidx:{prefix}`
// You can use the cmdFn parameter to mutate the index alter command.
func (r *JSONRepository[T]) AlterIndex(ctx context.Context, cmdFn func(alter FtAlterIndex) rueidis.Completed) error {
return r.client.Do(ctx, cmdFn(r.client.B().FtAlter().Index(r.idx))).Error()
}
// CreateIndex uses FT.CREATE from the RediSearch module to create inverted index under the name `jsonidx:{prefix}`
// You can use the cmdFn parameter to mutate the index construction command,
// and note that the field name should be specified with JSON path syntax, otherwise the index may not work as expected.
func (r *JSONRepository[T]) CreateIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error {
return r.client.Do(ctx, cmdFn(r.client.B().FtCreate().Index(r.idx).OnJson().Prefix(1).Prefix(r.prefix+":").Schema())).Error()
}
// DropIndex uses FT.DROPINDEX from the RediSearch module to drop index whose name is `jsonidx:{prefix}`
func (r *JSONRepository[T]) DropIndex(ctx context.Context) error {
return r.client.Do(ctx, r.client.B().FtDropindex().Index(r.idx).Build()).Error()
}
// Search uses FT.SEARCH from the RediSearch module to search the index whose name is `jsonidx:{prefix}`
// It returns three values:
// 1. total count of match results inside the redis, and note that it might be larger than returned search result.
// 2. search result, and note that its length might smaller than the first return value.
// 3. error if any
// You can use the cmdFn parameter to mutate the search command.
func (r *JSONRepository[T]) Search(ctx context.Context, cmdFn func(search FtSearchIndex) rueidis.Completed) (n int64, s []*T, err error) {
n, resp, err := r.client.Do(ctx, cmdFn(r.client.B().FtSearch().Index(r.idx))).AsFtSearch()
if err == nil {
s = make([]*T, len(resp))
for i, v := range resp {
doc := v.Doc["$"]
doc = strings.TrimPrefix(doc, "[") // supports dialect 3
doc = strings.TrimSuffix(doc, "]")
if s[i], err = r.decode(doc); err != nil {
return 0, nil, err
}
}
}
return n, s, err
}
// Aggregate performs the FT.AGGREGATE and returns a *AggregateCursor for accessing the results
func (r *JSONRepository[T]) Aggregate(ctx context.Context, cmdFn func(agg FtAggregateIndex) rueidis.Completed) (cursor *AggregateCursor, err error) {
cid, total, resp, err := r.client.Do(ctx, cmdFn(r.client.B().FtAggregate().Index(r.idx))).AsFtAggregateCursor()
if err != nil {
return nil, err
}
return newAggregateCursor(r.idx, r.client, resp, cid, total), nil
}
// IndexName returns the index name used in the FT.CREATE
func (r *JSONRepository[T]) IndexName() string {
return r.idx
}
var jsonSaveScript = rueidis.NewLuaScript(`
local v = redis.call('JSON.GET',KEYS[1],ARGV[1])
if (not v or v == ARGV[2])
then
redis.call('JSON.SET',KEYS[1],'$',ARGV[3])
local v = redis.call('JSON.NUMINCRBY',KEYS[1],ARGV[1],1)
if #ARGV == 4 then redis.call('PEXPIREAT',KEYS[1],ARGV[4]) end
return v
end
return nil
`)