Skip to content

Commit

Permalink
add hmslot
Browse files Browse the repository at this point in the history
  • Loading branch information
zhs committed Jan 10, 2019
1 parent 817fd8a commit 381fcc6
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 43 deletions.
18 changes: 18 additions & 0 deletions command/hashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,21 @@ func HMSet(ctx *Context, txn *db.Transaction) (OnCommit, error) {
}
return SimpleString(ctx.Out, "OK"), nil
}

// HMSlot
func HMSlot(ctx *Context, txn *db.Transaction) (OnCommit, error) {
key := []byte(ctx.Args[0])
count, err := strconv.ParseInt(ctx.Args[1], 10, 64)
if err != nil || count < 0 {
return nil, ErrInteger
}
hash, err := txn.Hash(key)
if err != nil {
return nil, errors.New("ERR " + err.Error())
}

if err := hash.HMSlot(count); err != nil {
return nil, errors.New("ERR " + err.Error())
}
return SimpleString(ctx.Out, "OK"), nil
}
2 changes: 2 additions & 0 deletions command/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func init() {
"hsetnx": HSetNX,
"hmget": HMGet,
"hmset": HMSet,
"hmslot": HMSlot,

// sets
"sadd": SAdd,
Expand Down Expand Up @@ -169,6 +170,7 @@ func init() {
"hsetnx": Desc{Proc: AutoCommit(HSetNX), Cons: Constraint{4, flags("wmF"), 1, 1, 1}},
"hmget": Desc{Proc: AutoCommit(HMGet), Cons: Constraint{-3, flags("rF"), 1, 1, 1}},
"hmset": Desc{Proc: AutoCommit(HMSet), Cons: Constraint{-3, flags("wmF"), 1, 1, 1}},
"hmslot": Desc{Proc: AutoCommit(HMSlot), Cons: Constraint{3, flags("wF"), 1, 1, 1}},

// sets
"sadd": Desc{Proc: AutoCommit(SAdd), Cons: Constraint{-3, flags("wmF"), 1, 1, 1}},
Expand Down
11 changes: 10 additions & 1 deletion command/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,20 @@ func Object(ctx *Context, txn *db.Transaction) (OnCommit, error) {
}
return nil, errors.New("ERR " + err.Error())
}

switch subCmd {
case "refcount", "freq":
return Integer(ctx.Out, 0), nil
case "idletime":
if obj.Type == db.ObjectHash {
hash, err := txn.Hash(key)
if err != nil {
return nil, errors.New("ERR " + err.Error())
}
obj, err = hash.Object()
if err != nil {
return nil, errors.New("ERR " + err.Error())
}
}
sec := int64(time.Since(time.Unix(0, obj.UpdatedAt)).Seconds())
return Integer(ctx.Out, sec), nil
case "encoding":
Expand Down
14 changes: 12 additions & 2 deletions command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,21 @@ func Debug(ctx *Context, txn *db.Transaction) (OnCommit, error) {
}
}
func debugObject(ctx *Context, txn *db.Transaction) (OnCommit, error) {
key := ctx.Args[1]
obj, err := txn.Object([]byte(key))
key := []byte(ctx.Args[1])
obj, err := txn.Object(key)
if err != nil {
return nil, err
}
if obj.Type == db.ObjectHash {
hash, err := txn.Hash(key)
if err != nil {
return nil, errors.New("ERR " + err.Error())
}
obj, err = hash.Object()
if err != nil {
return nil, errors.New("ERR " + err.Error())
}
}
return SimpleString(ctx.Out, obj.String()), nil
}

Expand Down
63 changes: 37 additions & 26 deletions db/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func NewHash(txn *Transaction, key []byte) *Hash {
}

func hashItemKey(key []byte, field []byte) []byte {
key = append(key, []byte(Separator)...)
key = append(key, ':')
return append(key, field...)
}

Expand Down Expand Up @@ -160,22 +160,22 @@ func (hash *Hash) HDel(fields [][]byte) (int64, error) {
for _, field := range fields {
keys = append(keys, hashItemKey(dkey, field))
}
kvMap, slotsMap, hlen, err := hash.delHash(keys)
values, hlen, err := hash.delHash(keys)
if err != nil {
return 0, err
}
if hlen == 0 {
return 0, nil
}
vlen := int64(len(kvMap))
vlen := int64(len(values))
if vlen >= hlen {
if err := hash.Destroy(); err != nil {
return 0, err
}
return vlen, nil
}

for k, v := range kvMap {
for k, v := range values {
if v == nil {
continue
}
Expand All @@ -187,16 +187,15 @@ func (hash *Hash) HDel(fields [][]byte) (int64, error) {
if num == 0 {
return 0, nil
}

// update Len and UpdateAt
if hash.isMetaSlot() {
slot := &Slot{}
slotID := hash.calculateSlotID(hash.meta.MetaSlot)
metaSlotKey := MetaSlotKey(hash.txn.db, hash.meta.ID, EncodeInt64(slotID))
if b, ok := slotsMap[string(metaSlotKey)]; ok {
if s, err := DecodeSlot(b); err == nil {
slot = s
}
slot, err := hash.getSlot(slotID)
if err != nil {
return 0, err
}
slot.Len = slot.Len - num
slot.Len -= num
slot.UpdatedAt = now
if err := hash.updateSlot(slotID, slot); err != nil {
return 0, err
Expand All @@ -205,8 +204,8 @@ func (hash *Hash) HDel(fields [][]byte) (int64, error) {
} else {
hash.meta.Len -= num
hash.meta.UpdatedAt = now
if err := hash.autoUpdateSlot(defaultHashMetaSlot); err != nil {
return 0, err
if err := hash.autoUpdateSlot(defaultHashMetaSlot); err == nil {
hash.meta.MetaSlot = defaultHashMetaSlot
}
if err := hash.updateMeta(); err != nil {
return 0, err
Expand All @@ -216,9 +215,8 @@ func (hash *Hash) HDel(fields [][]byte) (int64, error) {
return num, nil
}

func (hash *Hash) delHash(keys [][]byte) (map[string][]byte, map[string][]byte, int64, error) {
func (hash *Hash) delHash(keys [][]byte) (map[string][]byte, int64, error) {
var (
slotsMap map[string][]byte
slots [][]byte
isMetaSlot = hash.isMetaSlot()
metaSlotKey = MetaSlotKey(hash.txn.db, hash.meta.ID, nil)
Expand All @@ -230,23 +228,22 @@ func (hash *Hash) delHash(keys [][]byte) (map[string][]byte, map[string][]byte,

kvMap, err := store.BatchGetValues(hash.txn.t, keys)
if err != nil {
return nil, nil, 0, err
return nil, 0, err
}
for k, v := range kvMap {
if isMetaSlot && bytes.Contains([]byte(k), metaSlotKey) {
slotsMap[string(k)] = v
slots = append(slots, v)
delete(kvMap, k)
}
}
if isMetaSlot && len(slots) > 0 {
slot, err := hash.calculateSlot(&slots)
if err != nil {
return nil, nil, 0, err
return nil, 0, err
}
return kvMap, slotsMap, slot.Len, nil
return kvMap, slot.Len, nil
}
return kvMap, nil, hash.meta.Len, nil
return kvMap, hash.meta.Len, nil
}

// HSet sets field in the hash stored at key to value
Expand Down Expand Up @@ -505,6 +502,19 @@ func (hash *Hash) HMSet(fields [][]byte, values [][]byte) error {
hash.meta.Len += added
return hash.updateMeta()
}

// HMSet sets meta slot num
func (hash *Hash) HMSlot(metaSlot int64) error {
if err := hash.autoUpdateSlot(metaSlot); err != nil {
return err
}
hash.meta.MetaSlot = metaSlot
if err := hash.updateMeta(); err != nil {
return err
}
return nil
}

func (hash *Hash) autoUpdateSlot(metaSlot int64) error {
isMetaSlot := hash.isMetaSlot()
if metaSlot < 0 {
Expand All @@ -515,7 +525,7 @@ func (hash *Hash) autoUpdateSlot(metaSlot int64) error {
}
if metaSlot > hash.meta.MetaSlot {
if !isMetaSlot && hash.meta.Len > 0 {
slot := &Slot{Len: hash.meta.Len, UpdatedAt: hash.meta.UpdatedAt}
slot := &Slot{Len: hash.meta.Len, UpdatedAt: Now()}
if err := hash.updateSlot(0, slot); err != nil {
return err
}
Expand All @@ -532,7 +542,7 @@ func (hash *Hash) autoUpdateSlot(metaSlot int64) error {
return err
}
sid := hash.calculateSlotID(metaSlot)
if err := hash.compareAndUpdateSlot(sid, slot); err != nil {
if err := hash.mergeSlot(sid, slot); err != nil {
return err
}
if err := hash.clearSliceSlot(metaSlot, hash.meta.MetaSlot-1); err != nil {
Expand All @@ -556,22 +566,23 @@ func (hash *Hash) clearSliceSlot(start, end int64) error {
return nil
}

func (hash *Hash) compareAndUpdateSlot(newID int64, old *Slot) error {
func (hash *Hash) mergeSlot(newID int64, old *Slot) error {
new, err := hash.getSlot(newID)
if err != nil {
return err
}
new.Len += old.Len
if new.UpdatedAt < old.UpdatedAt {
new.UpdatedAt = old.UpdatedAt
}
new.UpdatedAt = Now()
return hash.updateSlot(newID, new)
}

func (hash *Hash) getSlot(slotID int64) (*Slot, error) {
metaSlotKey := MetaSlotKey(hash.txn.db, hash.meta.ID, EncodeInt64(slotID))
raw, err := hash.txn.t.Get(metaSlotKey)
if err != nil {
if IsErrNotFound(err) {
return &Slot{UpdatedAt: Now()}, nil
}
return nil, err
}
slot, err := DecodeSlot(raw)
Expand Down
10 changes: 9 additions & 1 deletion db/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,15 @@ func (kv *Kv) Delete(keys [][]byte) (int64, error) {
if IsExpired(obj, now) {
continue
}
if err := kv.txn.Destory(obj, mapping[k]); err != nil {
if obj.Type == ObjectHash {
hash := NewHash(kv.txn, mapping[k])
if err := hash.meta.Decode(val); err != nil {
continue
}
if err := hash.Destroy(); err != nil {
continue
}
} else if err := kv.txn.Destory(obj, mapping[k]); err != nil {
continue
}
count++
Expand Down
13 changes: 0 additions & 13 deletions db/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,6 @@ func (txn *Transaction) Object(key []byte) (*Object, error) {
if IsExpired(obj, Now()) {
return nil, ErrKeyNotFound
}
if obj.Type == ObjectHash {
hash := NewHash(txn, key)
if err := hash.meta.Decode(meta); err != nil {
return nil, err
}
return hash.Object()
}

return obj, nil
}

Expand All @@ -158,11 +150,6 @@ func (txn *Transaction) Destory(obj *Object, key []byte) error {
return err
}
}
if obj.Type == ObjectHash {
if err := slotGC(txn, obj.ID); err != nil {
return err
}
}

return nil
}

0 comments on commit 381fcc6

Please sign in to comment.