From 5ad8c2acc83b0c73b9000947e9d5a81515da25b5 Mon Sep 17 00:00:00 2001 From: Dustin Xie <dahuaxie@gmail.com> Date: Mon, 29 Apr 2024 12:41:51 -0700 Subject: [PATCH 1/9] [db] implement CommitToDB() for BoltDBVersioned --- db/db_versioned.go | 238 ++++++++++++++++++++++++++++ db/db_versioned_test.go | 340 +++++++++++++++++++++++++++++++++------- 2 files changed, 517 insertions(+), 61 deletions(-) diff --git a/db/db_versioned.go b/db/db_versioned.go index 0d356b9b0d..957569fec0 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -11,12 +11,15 @@ import ( "context" "fmt" "math" + "syscall" "github.com/pkg/errors" bolt "go.etcd.io/bbolt" + "go.uber.org/zap" "github.com/iotexproject/iotex-core/v2/db/batch" "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" + "github.com/iotexproject/iotex-core/v2/pkg/log" "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" ) @@ -196,6 +199,241 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) { return last, err } +// CommitToDB write a batch to DB, where the batch can contain keys for +// both versioned and non-versioned namespace +func (b *BoltDBVersioned) CommitToDB(version uint64, vns map[string]bool, kvsb batch.KVStoreBatch) error { + vnsize, ve, nve, err := dedup(vns, kvsb) + if err != nil { + return errors.Wrapf(err, "BoltDBVersioned failed to write batch") + } + return b.commitToDB(version, vnsize, ve, nve) +} + +func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, nve []*batch.WriteInfo) error { + var ( + err error + nonDBErr bool + ) + for c := uint8(0); c < b.db.config.NumRetries; c++ { + buckets := make(map[string]*bolt.Bucket) + if err = b.db.db.Update(func(tx *bolt.Tx) error { + // create/check metadata of all namespaces + for ns, size := range vnsize { + bucket, ok := buckets[ns] + if !ok { + bucket, err = tx.CreateBucketIfNotExists([]byte(ns)) + if err != nil { + return errors.Wrapf(err, "failed to create bucket %s", ns) + } + buckets[ns] = bucket + } + var vn *versionedNamespace + if val := bucket.Get(_minKey); val == nil { + // namespace not created yet + vn = &versionedNamespace{ + keyLen: uint32(size), + } + ve = append(ve, batch.NewWriteInfo( + batch.Put, ns, _minKey, vn.serialize(), + fmt.Sprintf("failed to create metadata for namespace %s", ns), + )) + } else { + if vn, err = deserializeVersionedNamespace(val); err != nil { + nonDBErr = true + return errors.Wrapf(err, "failed to get metadata of bucket %s", ns) + } + if vn.keyLen != uint32(size) { + nonDBErr = true + return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, size) + } + } + } + // keep order of the writes same as the original batch + for i := len(ve) - 1; i >= 0; i-- { + var ( + write = ve[i] + ns = write.Namespace() + key = write.Key() + val = write.Value() + ) + // get bucket + bucket, ok := buckets[ns] + if !ok { + bucket, err = tx.CreateBucketIfNotExists([]byte(ns)) + if err != nil { + return errors.Wrapf(err, "failed to create bucket %s", ns) + } + buckets[ns] = bucket + } + // check key's last version + var ( + last uint64 + notexist bool + maxKey = keyForWrite(key, math.MaxUint64) + ) + c := bucket.Cursor() + k, _ := c.Seek(maxKey) + if k == nil || bytes.Compare(k, maxKey) == 1 { + k, _ = c.Prev() + if k == nil || bytes.Compare(k, keyForDelete(key, 0)) <= 0 { + // cursor is at the beginning/end of the bucket or smaller than minimum key + notexist = true + } + } + if !notexist { + _, last = parseKey(k) + } + switch write.WriteType() { + case batch.Put: + if bytes.Equal(key, _minKey) { + // create namespace + if err = bucket.Put(key, val); err != nil { + return errors.Wrap(err, write.Error()) + } + } else { + // wrong-size key should be caught in dedup(), but check anyway + if vnsize[ns] != len(key) { + panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key))) + } + if !notexist && version <= last { + // not allowed to perform write on an earlier version + nonDBErr = true + return ErrInvalid + } + if err = bucket.Put(keyForWrite(key, version), val); err != nil { + return errors.Wrap(err, write.Error()) + } + } + case batch.Delete: + if notexist { + continue + } + // wrong-size key should be caught in dedup(), but check anyway + if vnsize[ns] != len(key) { + panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key))) + } + if version < last { + // not allowed to perform delete on an earlier version + nonDBErr = true + return ErrInvalid + } + if err = bucket.Put(keyForDelete(key, version), nil); err != nil { + return errors.Wrap(err, write.Error()) + } + if err = bucket.Delete(keyForWrite(key, version)); err != nil { + return errors.Wrap(err, write.Error()) + } + } + } + // write non-versioned keys + for i := len(nve) - 1; i >= 0; i-- { + var ( + write = nve[i] + ns = write.Namespace() + ) + switch write.WriteType() { + case batch.Put: + // get bucket + bucket, ok := buckets[ns] + if !ok { + bucket, err = tx.CreateBucketIfNotExists([]byte(ns)) + if err != nil { + return errors.Wrapf(err, "failed to create bucket %s", ns) + } + buckets[ns] = bucket + } + if err = bucket.Put(write.Key(), write.Value()); err != nil { + return errors.Wrap(err, write.Error()) + } + case batch.Delete: + bucket := tx.Bucket([]byte(ns)) + if bucket == nil { + continue + } + if err = bucket.Delete(write.Key()); err != nil { + return errors.Wrap(err, write.Error()) + } + } + } + return nil + }); err == nil || nonDBErr { + break + } + } + if nonDBErr { + return err + } + if err != nil { + if errors.Is(err, syscall.ENOSPC) { + log.L().Fatal("BoltDBVersioned failed to write batch", zap.Error(err)) + } + return errors.Wrap(ErrIO, err.Error()) + } + return nil +} + +// dedup does 3 things: +// 1. deduplicate entries in the batch, only keep the last write for each key +// 2. splits entries into 2 slices according to the input namespace map +// 3. return a map of input namespace's keyLength +func dedup(vns map[string]bool, kvsb batch.KVStoreBatch) (map[string]int, []*batch.WriteInfo, []*batch.WriteInfo, error) { + kvsb.Lock() + defer kvsb.Unlock() + + type doubleKey struct { + ns string + key string + } + + var ( + entryKeySet = make(map[doubleKey]bool) + nsKeyLen = make(map[string]int) + nsInMap = make([]*batch.WriteInfo, 0) + other = make([]*batch.WriteInfo, 0) + pickAll = len(vns) == 0 + ) + for i := kvsb.Size() - 1; i >= 0; i-- { + write, e := kvsb.Entry(i) + if e != nil { + return nil, nil, nil, e + } + // only handle Put and Delete + var ( + writeType = write.WriteType() + ns = write.Namespace() + key = write.Key() + ) + if writeType != batch.Put && writeType != batch.Delete { + continue + } + k := doubleKey{ns: ns, key: string(key)} + if entryKeySet[k] { + continue + } + if writeType == batch.Put { + // for a later DELETE, we want to capture the earlier PUT + // otherwise, the DELETE might return not-exist + entryKeySet[k] = true + } + if pickAll || vns[k.ns] { + nsInMap = append(nsInMap, write) + } else { + other = append(other, write) + } + // check key size + if pickAll || vns[k.ns] { + if n, ok := nsKeyLen[k.ns]; !ok { + nsKeyLen[k.ns] = len(write.Key()) + } else { + if n != len(write.Key()) { + return nil, nil, nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", n, len(write.Key())) + } + } + } + } + return nsKeyLen, nsInMap, other, nil +} + func isNotExist(err error) bool { return err == ErrNotExist || err == ErrBucketNotExist } diff --git a/db/db_versioned_test.go b/db/db_versioned_test.go index d13e9afec0..e0c8fd3bb0 100644 --- a/db/db_versioned_test.go +++ b/db/db_versioned_test.go @@ -8,14 +8,21 @@ package db import ( "context" + "math" "testing" "github.com/pkg/errors" "github.com/stretchr/testify/require" + "github.com/iotexproject/iotex-core/v2/db/batch" "github.com/iotexproject/iotex-core/v2/testutil" ) +var ( + _k5 = []byte("key_5") + _k10 = []byte("key_10") +) + type versionTest struct { ns string k, v []byte @@ -50,10 +57,6 @@ func TestVersionedDB(t *testing.T) { r.NoError(err) r.EqualValues(len(_k2), vn.keyLen) // check more Put/Get - var ( - _k5 = []byte("key_5") - _k10 = []byte("key_10") - ) err = db.Put(1, _bucket1, _k10, _v1) r.Equal("invalid key length, expecting 5, got 6: invalid input", err.Error()) r.NoError(db.Put(1, _bucket1, _k2, _v1)) @@ -239,6 +242,173 @@ func TestVersionedDB(t *testing.T) { } func TestMultipleWriteDelete(t *testing.T) { + r := require.New(t) + for i := 0; i < 2; i++ { + testPath, err := testutil.PathOfTempFile("test-version") + r.NoError(err) + defer func() { + testutil.CleanupPath(testPath) + }() + + cfg := DefaultConfig + cfg.DbPath = testPath + db := NewBoltDBVersioned(cfg) + ctx := context.Background() + r.NoError(db.Start(ctx)) + + if i == 0 { + // multiple writes and deletes + r.NoError(db.Put(1, _bucket1, _k2, _v1)) + r.NoError(db.Put(3, _bucket1, _k2, _v3)) + v, err := db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(3, v) + r.NoError(db.Delete(7, _bucket1, _k2)) + _, err = db.Version(_bucket1, _k2) + r.Equal(ErrDeleted, errors.Cause(err)) + r.NoError(db.Put(10, _bucket1, _k2, _v2)) + v, err = db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(10, v) + r.NoError(db.Delete(15, _bucket1, _k2)) + _, err = db.Version(_bucket1, _k2) + r.Equal(ErrDeleted, errors.Cause(err)) + r.NoError(db.Put(18, _bucket1, _k2, _v3)) + r.NoError(db.Delete(18, _bucket1, _k2)) + r.NoError(db.Put(18, _bucket1, _k2, _v3)) + r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write + _, err = db.Version(_bucket1, _k2) + r.Equal(ErrDeleted, errors.Cause(err)) + r.NoError(db.Put(21, _bucket1, _k2, _v4)) + v, err = db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(21, v) + r.NoError(db.Delete(25, _bucket1, _k2)) + r.NoError(db.Put(25, _bucket1, _k2, _k2)) + r.NoError(db.Delete(25, _bucket1, _k2)) + r.NoError(db.Put(25, _bucket1, _k2, _k2)) // write-after-delete + v, err = db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(25, v) + } else { + // multiple writes and deletes using commitToDB + b := batch.NewBatch() + for _, e := range []versionTest{ + {_bucket1, _k2, _v1, 1, nil}, + {_bucket1, _k2, _v3, 3, nil}, + {_bucket1, _k2, nil, 7, ErrDeleted}, + {_bucket1, _k2, _v2, 10, nil}, + {_bucket1, _k2, nil, 15, ErrDeleted}, + {_bucket1, _k2, _v3, 18, ErrDeleted}, // delete-after-write + {_bucket1, _k2, _v4, 21, nil}, + {_bucket1, _k2, _k2, 25, nil}, // write-after-delete + } { + if e.height == 7 || e.height == 15 { + b.Delete(e.ns, e.k, "test") + } else if e.height == 18 { + b.Put(e.ns, e.k, e.v, "test") + b.Delete(e.ns, e.k, "test") + b.Put(e.ns, e.k, e.v, "test") + b.Delete(e.ns, e.k, "test") + } else if e.height == 25 { + b.Delete(e.ns, e.k, "test") + b.Put(e.ns, e.k, e.v, "test") + b.Delete(e.ns, e.k, "test") + b.Put(e.ns, e.k, e.v, "test") + } else { + b.Put(e.ns, e.k, e.v, "test") + } + r.NoError(db.CommitToDB(e.height, nil, b)) + b.Clear() + v, err := db.Version(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + if err == nil { + r.EqualValues(e.height, v) + } + } + } + for _, e := range []versionTest{ + {_bucket1, _k2, nil, 0, ErrNotExist}, + {_bucket1, _k2, _v1, 1, nil}, + {_bucket1, _k2, _v1, 2, nil}, + {_bucket1, _k2, _v3, 3, nil}, + {_bucket1, _k2, _v3, 6, nil}, + {_bucket1, _k2, nil, 7, ErrDeleted}, + {_bucket1, _k2, nil, 9, ErrDeleted}, + {_bucket1, _k2, _v2, 10, nil}, + {_bucket1, _k2, _v2, 14, nil}, + {_bucket1, _k2, nil, 15, ErrDeleted}, + {_bucket1, _k2, nil, 17, ErrDeleted}, + {_bucket1, _k2, nil, 18, ErrDeleted}, + {_bucket1, _k2, nil, 20, ErrDeleted}, + {_bucket1, _k2, _v4, 21, nil}, + {_bucket1, _k2, _v4, 22, nil}, + {_bucket1, _k2, _v4, 24, nil}, + {_bucket1, _k2, _k2, 25, nil}, + {_bucket1, _k2, _k2, 26, nil}, + {_bucket1, _k2, _k2, 25000, nil}, + } { + value, err := db.Get(e.height, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + r.NoError(db.Stop(ctx)) + } +} + +func TestDedup(t *testing.T) { + r := require.New(t) + + b := batch.NewBatch() + for _, e := range []versionTest{ + {_bucket2, _v1, _v2, 0, nil}, + {_bucket2, _v2, _v3, 9, nil}, + {_bucket2, _v3, _v4, 3, nil}, + {_bucket2, _v4, _v1, 1, nil}, + {_bucket1, _k1, _v1, 0, nil}, + {_bucket1, _k2, _v2, 9, nil}, + {_bucket1, _k3, _v3, 3, nil}, + {_bucket1, _k4, _v4, 1, nil}, + } { + b.Put(e.ns, e.k, e.v, "test") + } + keySize, ve, ce, err := dedup(nil, b) + r.NoError(err) + r.Equal(2, len(keySize)) + r.Equal(5, keySize[_bucket1]) + r.Equal(7, keySize[_bucket2]) + r.Equal(8, len(ve)) + r.Zero(len(ce)) + for i, v := range [][]byte{_k4, _k3, _k2, _k1, _v4, _v3, _v2, _v1} { + r.Equal(v, ve[i].Key()) + } + // put a key with diff length into _bucket2 + b.Put(_bucket2, _k1, _v1, "test") + // treat _bucket1 as versioned namespace still OK + keySize, ve, ce, err = dedup(map[string]bool{ + _bucket1: true, + }, b) + r.NoError(err) + r.Equal(1, len(keySize)) + r.Equal(5, keySize[_bucket1]) + r.Equal(4, len(ve)) + r.Equal(5, len(ce)) + for i, v := range [][]byte{_k4, _k3, _k2, _k1} { + r.Equal(v, ve[i].Key()) + } + for i, v := range [][]byte{_k1, _v4, _v3, _v2, _v1} { + r.Equal(v, ce[i].Key()) + } + // treat _bucket2 (or both buckets) as versioned namespace hits error due to diff key size + for _, v := range []map[string]bool{ + {_bucket2: true}, nil, + } { + _, _, _, err = dedup(v, b) + r.Equal("invalid key length, expecting 5, got 7: invalid input", err.Error()) + } +} + +func TestCommitToDB(t *testing.T) { r := require.New(t) testPath, err := testutil.PathOfTempFile("test-version") r.NoError(err) @@ -255,68 +425,116 @@ func TestMultipleWriteDelete(t *testing.T) { db.Stop(ctx) }() - // multiple writes and deletes - r.NoError(db.Put(1, _bucket1, _k2, _v1)) - r.NoError(db.Put(3, _bucket1, _k2, _v3)) - v, err := db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(3, v) - r.NoError(db.Delete(7, _bucket1, _k2)) - _, err = db.Version(_bucket1, _k2) - r.Equal(ErrDeleted, errors.Cause(err)) - r.NoError(db.Put(10, _bucket1, _k2, _v2)) - v, err = db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(10, v) - r.NoError(db.Delete(15, _bucket1, _k2)) - _, err = db.Version(_bucket1, _k2) - r.Equal(ErrDeleted, errors.Cause(err)) - r.NoError(db.Put(18, _bucket1, _k2, _v3)) - r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write - _, err = db.Version(_bucket1, _k2) - r.Equal(ErrDeleted, errors.Cause(err)) - r.NoError(db.Put(18, _bucket1, _k2, _v3)) // write again - value, err := db.Get(18, _bucket1, _k2) - r.NoError(err) - r.Equal(_v3, value) - v, err = db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(18, v) - r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write - _, err = db.Version(_bucket1, _k2) - r.Equal(ErrDeleted, errors.Cause(err)) - r.NoError(db.Put(21, _bucket1, _k2, _v4)) - v, err = db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(21, v) - r.NoError(db.Delete(25, _bucket1, _k2)) - r.NoError(db.Put(25, _bucket1, _k2, _k2)) // write-after-delete - v, err = db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(25, v) + b := batch.NewBatch() for _, e := range []versionTest{ + {_bucket2, _v1, _k1, 0, nil}, + {_bucket2, _v2, _k2, 9, nil}, + {_bucket2, _v3, _k3, 3, nil}, + {_bucket1, _k1, _v1, 0, nil}, + {_bucket1, _k2, _v2, 9, nil}, + } { + b.Put(e.ns, e.k, e.v, "test") + } + + r.NoError(db.CommitToDB(1, nil, b)) + b.Clear() + for _, e := range []versionTest{ + {_bucket2, _v1, nil, 0, ErrNotExist}, + {_bucket2, _v2, nil, 0, ErrNotExist}, + {_bucket2, _v3, nil, 0, ErrNotExist}, + {_bucket2, _v4, nil, 0, ErrNotExist}, + {_bucket2, _v1, _k1, 1, nil}, + {_bucket2, _v2, _k2, 1, nil}, + {_bucket2, _v3, _k3, 1, nil}, + {_bucket2, _v1, _k1, 2, nil}, + {_bucket2, _v2, _k2, 2, nil}, + {_bucket2, _v3, _k3, 2, nil}, + {_bucket2, _v4, nil, 2, ErrNotExist}, + {_bucket1, _k1, nil, 0, ErrNotExist}, {_bucket1, _k2, nil, 0, ErrNotExist}, - {_bucket1, _k2, _v1, 1, nil}, - {_bucket1, _k2, _v1, 2, nil}, - {_bucket1, _k2, _v3, 3, nil}, - {_bucket1, _k2, _v3, 6, nil}, - {_bucket1, _k2, nil, 7, ErrDeleted}, - {_bucket1, _k2, nil, 9, ErrDeleted}, - {_bucket1, _k2, _v2, 10, nil}, - {_bucket1, _k2, _v2, 14, nil}, - {_bucket1, _k2, nil, 15, ErrDeleted}, - {_bucket1, _k2, nil, 17, ErrDeleted}, - {_bucket1, _k2, nil, 18, ErrDeleted}, - {_bucket1, _k2, nil, 20, ErrDeleted}, - {_bucket1, _k2, _v4, 21, nil}, - {_bucket1, _k2, _v4, 22, nil}, - {_bucket1, _k2, _v4, 24, nil}, - {_bucket1, _k2, _k2, 25, nil}, - {_bucket1, _k2, _k2, 26, nil}, - {_bucket1, _k2, _k2, 25000, nil}, + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 0, ErrNotExist}, + {_bucket1, _k1, _v1, 1, nil}, + {_bucket1, _k2, _v2, 1, nil}, + {_bucket1, _k1, _v1, 3, nil}, + {_bucket1, _k2, _v2, 3, nil}, + {_bucket1, _k3, nil, 3, ErrNotExist}, + {_bucket1, _k4, nil, 3, ErrNotExist}, + } { + value, err := db.Get(e.height, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + + // batch with wrong key length would fail + b.Put(_bucket1, _v1, _k1, "test") + r.Equal(ErrInvalid, errors.Cause(db.CommitToDB(3, nil, b))) + b.Clear() + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 0, nil}, + {_bucket1, _k2, _v3, 9, nil}, + {_bucket1, _k3, _v1, 3, nil}, + {_bucket1, _k4, _v2, 1, nil}, + {_bucket2, _v1, _k3, 0, nil}, + {_bucket2, _v2, _k2, 9, nil}, + {_bucket2, _v3, _k1, 3, nil}, + {_bucket2, _v4, _k4, 1, nil}, + } { + b.Put(e.ns, e.k, e.v, "test") + } + b.Delete(_bucket1, _k3, "test") + b.Delete(_bucket2, _v3, "test") + + r.NoError(db.CommitToDB(5, nil, b)) + b.Clear() + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, nil, 0, ErrNotExist}, + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 0, ErrNotExist}, + {_bucket2, _v1, nil, 0, ErrNotExist}, + {_bucket2, _v2, nil, 0, ErrNotExist}, + {_bucket2, _v3, nil, 0, ErrNotExist}, + {_bucket2, _v4, nil, 0, ErrNotExist}, } { value, err := db.Get(e.height, e.ns, e.k) r.Equal(e.err, errors.Cause(err)) r.Equal(e.v, value) } + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 1, nil}, + {_bucket1, _k2, _v2, 1, nil}, + {_bucket1, _k3, nil, 1, ErrNotExist}, + {_bucket1, _k4, nil, 1, ErrNotExist}, + {_bucket2, _v1, _k1, 1, nil}, + {_bucket2, _v2, _k2, 1, nil}, + {_bucket2, _v3, _k3, 1, nil}, + {_bucket2, _v4, nil, 1, ErrNotExist}, + } { + for _, h := range []uint64{1, 2, 3, 4} { + value, err := db.Get(h, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + } + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 5, nil}, + {_bucket1, _k2, _v3, 5, nil}, + {_bucket1, _k3, nil, 5, ErrDeleted}, + {_bucket1, _k4, _v2, 5, nil}, + {_bucket2, _v1, _k3, 5, nil}, + {_bucket2, _v2, _k2, 5, nil}, + {_bucket2, _v3, nil, 5, ErrDeleted}, + {_bucket2, _v4, _k4, 5, nil}, + } { + for _, h := range []uint64{5, 16, 64, 3000, math.MaxUint64} { + value, err := db.Get(h, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + } + // cannot write to earlier version + b.Put(_bucket1, _k1, _v2, "test") + b.Put(_bucket1, _k2, _v1, "test") + r.ErrorIs(db.CommitToDB(4, nil, b), ErrInvalid) } From 05260d433ad6c8ac7f38f3104fa53aae0681e8f0 Mon Sep 17 00:00:00 2001 From: dustinxie <dahuaxie@gmail.com> Date: Sun, 15 Dec 2024 11:13:14 -0800 Subject: [PATCH 2/9] address coment --- db/db_versioned.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/db/db_versioned.go b/db/db_versioned.go index 957569fec0..d8686a403f 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -259,11 +259,7 @@ func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, // get bucket bucket, ok := buckets[ns] if !ok { - bucket, err = tx.CreateBucketIfNotExists([]byte(ns)) - if err != nil { - return errors.Wrapf(err, "failed to create bucket %s", ns) - } - buckets[ns] = bucket + panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), vns = %s does not exist", ns)) } // check key's last version var ( @@ -286,7 +282,7 @@ func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, switch write.WriteType() { case batch.Put: if bytes.Equal(key, _minKey) { - // create namespace + // create metadata for namespace if err = bucket.Put(key, val); err != nil { return errors.Wrap(err, write.Error()) } From 38cc2ec140b8f4706f95bcb894309e018359f94b Mon Sep 17 00:00:00 2001 From: dustinxie <dahuaxie@gmail.com> Date: Thu, 19 Dec 2024 16:44:38 -0800 Subject: [PATCH 3/9] add AddVersionedNamespace() method --- db/db_versioned.go | 69 +++++++++++++++++++++++------------------ db/db_versioned_test.go | 23 +++++++++----- 2 files changed, 55 insertions(+), 37 deletions(-) diff --git a/db/db_versioned.go b/db/db_versioned.go index d8686a403f..d01ad667cd 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -44,6 +44,9 @@ type ( // Base returns the underlying KVStore Base() KVStore + // AddVersionedNamespace adds a versioned namespace + AddVersionedNamespace(string, uint32) error + // Version returns the key's most recent version Version(string, []byte) (uint64, error) } @@ -77,6 +80,24 @@ func (b *BoltDBVersioned) Base() KVStore { return b.db } +// AddVersionedNamespace adds a versioned namespace +func (b *BoltDBVersioned) AddVersionedNamespace(ns string, keyLen uint32) error { + vn, err := b.checkNamespace(ns) + if cause := errors.Cause(err); cause == ErrNotExist || cause == ErrBucketNotExist { + // create metadata for namespace + return b.db.Put(ns, _minKey, (&versionedNamespace{ + keyLen: keyLen, + }).serialize()) + } + if err != nil { + return err + } + if vn.keyLen != keyLen { + return errors.Wrapf(ErrInvalid, "namespace %s already exists with key length = %d, got %d", ns, vn.keyLen, keyLen) + } + return nil +} + // Put writes a <key, value> record func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) error { if !b.db.IsReady() { @@ -87,23 +108,19 @@ func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) erro if err != nil { return err } - buf := batch.NewBatch() - if vn == nil { - // namespace not yet created - buf.Put(ns, _minKey, (&versionedNamespace{ - keyLen: uint32(len(key)), - }).serialize(), "failed to create metadata") - } else { - if len(key) != int(vn.keyLen) { - return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key)) - } - last, _, err := b.get(math.MaxUint64, ns, key) - if !isNotExist(err) && version < last { - // not allowed to perform write on an earlier version - return ErrInvalid - } - buf.Delete(ns, keyForDelete(key, version), fmt.Sprintf("failed to delete key %x", key)) + if len(key) != int(vn.keyLen) { + return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key)) + } + last, _, err := b.get(math.MaxUint64, ns, key) + if !isNotExist(err) && version < last { + // not allowed to perform write on an earlier version + return errors.Wrapf(ErrInvalid, "cannot write at earlier version %d", version) + } + if version != last { + return b.db.Put(ns, keyForWrite(key, version), value) } + buf := batch.NewBatch() + buf.Delete(ns, keyForDelete(key, version), fmt.Sprintf("failed to delete key %x", key)) buf.Put(ns, keyForWrite(key, version), value, fmt.Sprintf("failed to put key %x", key)) return b.db.WriteBatch(buf) } @@ -174,7 +191,10 @@ func (b *BoltDBVersioned) Delete(version uint64, ns string, key []byte) error { } if version < last { // not allowed to perform delete on an earlier version - return ErrInvalid + return errors.Wrapf(ErrInvalid, "cannot delete at earlier version %d", version) + } + if version != last { + return b.db.Put(ns, keyForDelete(key, version), nil) } buf := batch.NewBatch() buf.Put(ns, keyForDelete(key, version), nil, fmt.Sprintf("failed to delete key %x", key)) @@ -455,18 +475,10 @@ func parseKey(key []byte) (bool, uint64) { func (b *BoltDBVersioned) checkNamespace(ns string) (*versionedNamespace, error) { data, err := b.db.Get(ns, _minKey) - switch errors.Cause(err) { - case nil: - vn, err := deserializeVersionedNamespace(data) - if err != nil { - return nil, err - } - return vn, nil - case ErrNotExist, ErrBucketNotExist: - return nil, nil - default: + if err != nil { return nil, err } + return deserializeVersionedNamespace(data) } func (b *BoltDBVersioned) checkNamespaceAndKey(ns string, key []byte) error { @@ -474,9 +486,6 @@ func (b *BoltDBVersioned) checkNamespaceAndKey(ns string, key []byte) error { if err != nil { return err } - if vn == nil { - return errors.Wrapf(ErrNotExist, "namespace = %x doesn't exist", ns) - } if len(key) != int(vn.keyLen) { return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key)) } diff --git a/db/db_versioned_test.go b/db/db_versioned_test.go index e0c8fd3bb0..283b7e818b 100644 --- a/db/db_versioned_test.go +++ b/db/db_versioned_test.go @@ -50,15 +50,18 @@ func TestVersionedDB(t *testing.T) { // namespace and key does not exist vn, err := db.checkNamespace(_bucket1) r.Nil(vn) - r.Nil(err) - // write first key, namespace and key now exist + r.ErrorContains(err, ErrNotExist.Error()) + // create namespace + r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k2)))) + r.ErrorContains(db.AddVersionedNamespace(_bucket1, 8), "namespace test_ns1 already exists with key length = 5, got 8") + // write first key r.NoError(db.Put(0, _bucket1, _k2, _v2)) vn, err = db.checkNamespace(_bucket1) r.NoError(err) r.EqualValues(len(_k2), vn.keyLen) // check more Put/Get err = db.Put(1, _bucket1, _k10, _v1) - r.Equal("invalid key length, expecting 5, got 6: invalid input", err.Error()) + r.ErrorContains(err, "invalid key length, expecting 5, got 6: invalid input") r.NoError(db.Put(1, _bucket1, _k2, _v1)) r.NoError(db.Put(3, _bucket1, _k2, _v3)) r.NoError(db.Put(6, _bucket1, _k2, _v2)) @@ -94,8 +97,8 @@ func TestVersionedDB(t *testing.T) { r.NoError(db.Put(6, _bucket1, _k2, _v4)) r.NoError(db.Put(7, _bucket1, _k4, _v4)) // write to earlier version again is invalid - r.Equal(ErrInvalid, db.Put(3, _bucket1, _k2, _v4)) - r.Equal(ErrInvalid, db.Put(4, _bucket1, _k4, _v4)) + r.ErrorContains(db.Put(3, _bucket1, _k2, _v4), "cannot write at earlier version 3: invalid input") + r.ErrorContains(db.Put(4, _bucket1, _k4, _v4), "cannot write at earlier version 4: invalid input") // write with same value r.NoError(db.Put(9, _bucket1, _k2, _v4)) r.NoError(db.Put(10, _bucket1, _k4, _v4)) @@ -145,6 +148,7 @@ func TestVersionedDB(t *testing.T) { r.Equal(ErrNotExist, errors.Cause(db.Delete(10, _bucket2, _k1))) for _, k := range [][]byte{_k2, _k4} { r.NoError(db.Delete(11, _bucket1, k)) + r.ErrorContains(db.Delete(10, _bucket1, k), "cannot delete at earlier version 10") } for _, k := range [][]byte{_k1, _k3, _k5} { r.Equal(ErrNotExist, errors.Cause(db.Delete(10, _bucket1, k))) @@ -181,8 +185,8 @@ func TestVersionedDB(t *testing.T) { r.Equal(e.v, value) } // write before delete version is invalid - r.Equal(ErrInvalid, db.Put(9, _bucket1, _k2, _k2)) - r.Equal(ErrInvalid, db.Put(9, _bucket1, _k4, _k4)) + r.ErrorContains(db.Put(9, _bucket1, _k2, _k2), "cannot write at earlier version 9: invalid input") + r.ErrorContains(db.Put(9, _bucket1, _k4, _k4), "cannot write at earlier version 9: invalid input") for _, e := range []versionTest{ {_bucket1, _k2, _v4, 10, nil}, // before delete version {_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version @@ -256,6 +260,8 @@ func TestMultipleWriteDelete(t *testing.T) { ctx := context.Background() r.NoError(db.Start(ctx)) + // create namespace + r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k2)))) if i == 0 { // multiple writes and deletes r.NoError(db.Put(1, _bucket1, _k2, _v1)) @@ -436,6 +442,9 @@ func TestCommitToDB(t *testing.T) { b.Put(e.ns, e.k, e.v, "test") } + // create namespace + r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k1)))) + r.NoError(db.AddVersionedNamespace(_bucket2, uint32(len(_v1)))) r.NoError(db.CommitToDB(1, nil, b)) b.Clear() for _, e := range []versionTest{ From 532fd1787051e51763141ee9d2d4adb95f978fc8 Mon Sep 17 00:00:00 2001 From: dustinxie <dahuaxie@gmail.com> Date: Thu, 19 Dec 2024 16:46:08 -0800 Subject: [PATCH 4/9] refactor commitToDB() --- db/db_versioned.go | 144 +++++++++++++++++++--------------------- db/db_versioned_test.go | 1 + 2 files changed, 69 insertions(+), 76 deletions(-) diff --git a/db/db_versioned.go b/db/db_versioned.go index d01ad667cd..86db62e062 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -247,25 +247,20 @@ func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, } buckets[ns] = bucket } - var vn *versionedNamespace - if val := bucket.Get(_minKey); val == nil { + val := bucket.Get(_minKey) + if val == nil { // namespace not created yet - vn = &versionedNamespace{ - keyLen: uint32(size), - } - ve = append(ve, batch.NewWriteInfo( - batch.Put, ns, _minKey, vn.serialize(), - fmt.Sprintf("failed to create metadata for namespace %s", ns), - )) - } else { - if vn, err = deserializeVersionedNamespace(val); err != nil { - nonDBErr = true - return errors.Wrapf(err, "failed to get metadata of bucket %s", ns) - } - if vn.keyLen != uint32(size) { - nonDBErr = true - return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, size) - } + nonDBErr = true + return errors.Wrapf(ErrInvalid, "namespace %s has not been added", ns) + } + vn, err := deserializeVersionedNamespace(val) + if err != nil { + nonDBErr = true + return errors.Wrapf(err, "failed to get metadata of bucket %s", ns) + } + if vn.keyLen != uint32(size) { + nonDBErr = true + return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, size) } } // keep order of the writes same as the original batch @@ -274,71 +269,19 @@ func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, write = ve[i] ns = write.Namespace() key = write.Key() - val = write.Value() ) // get bucket bucket, ok := buckets[ns] if !ok { panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), vns = %s does not exist", ns)) } - // check key's last version - var ( - last uint64 - notexist bool - maxKey = keyForWrite(key, math.MaxUint64) - ) - c := bucket.Cursor() - k, _ := c.Seek(maxKey) - if k == nil || bytes.Compare(k, maxKey) == 1 { - k, _ = c.Prev() - if k == nil || bytes.Compare(k, keyForDelete(key, 0)) <= 0 { - // cursor is at the beginning/end of the bucket or smaller than minimum key - notexist = true - } + // wrong-size key should be caught in dedup(), but check anyway + if vnsize[ns] != len(key) { + panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key))) } - if !notexist { - _, last = parseKey(k) - } - switch write.WriteType() { - case batch.Put: - if bytes.Equal(key, _minKey) { - // create metadata for namespace - if err = bucket.Put(key, val); err != nil { - return errors.Wrap(err, write.Error()) - } - } else { - // wrong-size key should be caught in dedup(), but check anyway - if vnsize[ns] != len(key) { - panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key))) - } - if !notexist && version <= last { - // not allowed to perform write on an earlier version - nonDBErr = true - return ErrInvalid - } - if err = bucket.Put(keyForWrite(key, version), val); err != nil { - return errors.Wrap(err, write.Error()) - } - } - case batch.Delete: - if notexist { - continue - } - // wrong-size key should be caught in dedup(), but check anyway - if vnsize[ns] != len(key) { - panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key))) - } - if version < last { - // not allowed to perform delete on an earlier version - nonDBErr = true - return ErrInvalid - } - if err = bucket.Put(keyForDelete(key, version), nil); err != nil { - return errors.Wrap(err, write.Error()) - } - if err = bucket.Delete(keyForWrite(key, version)); err != nil { - return errors.Wrap(err, write.Error()) - } + nonDBErr, err = writeVersionedEntry(version, bucket, write) + if err != nil { + return err } } // write non-versioned keys @@ -388,6 +331,55 @@ func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, return nil } +func writeVersionedEntry(version uint64, bucket *bolt.Bucket, ve *batch.WriteInfo) (bool, error) { + var ( + key = ve.Key() + val = ve.Value() + last uint64 + notexist bool + maxKey = keyForWrite(key, math.MaxUint64) + ) + c := bucket.Cursor() + k, _ := c.Seek(maxKey) + if k == nil || bytes.Compare(k, maxKey) == 1 { + k, _ = c.Prev() + if k == nil || bytes.Compare(k, keyForDelete(key, 0)) <= 0 { + // cursor is at the beginning/end of the bucket or smaller than minimum key + notexist = true + } + } + if !notexist { + _, last = parseKey(k) + } + switch ve.WriteType() { + case batch.Put: + if !notexist && version <= last { + // not allowed to perform write on an earlier version + return true, errors.Wrapf(ErrInvalid, "cannot write at earlier version %d", version) + } + if err := bucket.Put(keyForWrite(key, version), val); err != nil { + return false, errors.Wrap(err, ve.Error()) + } + case batch.Delete: + if notexist { + return false, nil + } + if version < last { + // not allowed to perform delete on an earlier version + return true, errors.Wrapf(ErrInvalid, "cannot delete at earlier version %d", version) + } + if err := bucket.Put(keyForDelete(key, version), nil); err != nil { + return false, errors.Wrap(err, ve.Error()) + } + if version == last { + if err := bucket.Delete(keyForWrite(key, version)); err != nil { + return false, errors.Wrap(err, ve.Error()) + } + } + } + return false, nil +} + // dedup does 3 things: // 1. deduplicate entries in the batch, only keep the last write for each key // 2. splits entries into 2 slices according to the input namespace map diff --git a/db/db_versioned_test.go b/db/db_versioned_test.go index 283b7e818b..7c62e0e117 100644 --- a/db/db_versioned_test.go +++ b/db/db_versioned_test.go @@ -441,6 +441,7 @@ func TestCommitToDB(t *testing.T) { } { b.Put(e.ns, e.k, e.v, "test") } + r.ErrorContains(db.CommitToDB(1, nil, b), "has not been added") // create namespace r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k1)))) From 31d07c1cc06505225426aedd3f88853490ca7c14 Mon Sep 17 00:00:00 2001 From: dustinxie <dahuaxie@gmail.com> Date: Wed, 8 Jan 2025 16:34:03 -0800 Subject: [PATCH 5/9] move versioned map to db level --- db/db_versioned.go | 75 +++++++++++++++++++++++++++-------------- db/db_versioned_test.go | 41 ++++++++++------------ 2 files changed, 68 insertions(+), 48 deletions(-) diff --git a/db/db_versioned.go b/db/db_versioned.go index 86db62e062..a4ff39ff13 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -41,8 +41,8 @@ type ( // Delete deletes a record by (namespace, key) Delete(uint64, string, []byte) error - // Base returns the underlying KVStore - Base() KVStore + // Filter returns <k, v> pair in a bucket that meet the condition + Filter(string, Condition, []byte, []byte) ([][]byte, [][]byte, error) // AddVersionedNamespace adds a versioned namespace AddVersionedNamespace(string, uint32) error @@ -53,14 +53,16 @@ type ( // BoltDBVersioned is KvVersioned implementation based on bolt DB BoltDBVersioned struct { - db *BoltDB + db *BoltDB + vns map[string]int // map of versioned namespace } ) // NewBoltDBVersioned instantiates an BoltDB which implements VersionedDB func NewBoltDBVersioned(cfg Config) *BoltDBVersioned { b := BoltDBVersioned{ - db: NewBoltDB(cfg), + db: NewBoltDB(cfg), + vns: make(map[string]int), } return &b } @@ -75,19 +77,18 @@ func (b *BoltDBVersioned) Stop(ctx context.Context) error { return b.db.Stop(ctx) } -// Base returns the underlying KVStore -func (b *BoltDBVersioned) Base() KVStore { - return b.db -} - // AddVersionedNamespace adds a versioned namespace func (b *BoltDBVersioned) AddVersionedNamespace(ns string, keyLen uint32) error { vn, err := b.checkNamespace(ns) if cause := errors.Cause(err); cause == ErrNotExist || cause == ErrBucketNotExist { // create metadata for namespace - return b.db.Put(ns, _minKey, (&versionedNamespace{ + if err = b.db.Put(ns, _minKey, (&versionedNamespace{ keyLen: keyLen, - }).serialize()) + }).serialize()); err != nil { + return err + } + b.vns[ns] = int(keyLen) + return nil } if err != nil { return err @@ -95,6 +96,7 @@ func (b *BoltDBVersioned) AddVersionedNamespace(ns string, keyLen uint32) error if vn.keyLen != keyLen { return errors.Wrapf(ErrInvalid, "namespace %s already exists with key length = %d, got %d", ns, vn.keyLen, keyLen) } + b.vns[ns] = int(keyLen) return nil } @@ -103,6 +105,9 @@ func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) erro if !b.db.IsReady() { return ErrDBNotStarted } + if _, ok := b.vns[ns]; !ok { + return b.db.Put(ns, key, value) + } // check namespace vn, err := b.checkNamespace(ns) if err != nil { @@ -130,6 +135,9 @@ func (b *BoltDBVersioned) Get(version uint64, ns string, key []byte) ([]byte, er if !b.db.IsReady() { return nil, ErrDBNotStarted } + if _, ok := b.vns[ns]; !ok { + return b.db.Get(ns, key) + } // check key's metadata if err := b.checkNamespaceAndKey(ns, key); err != nil { return nil, err @@ -181,6 +189,9 @@ func (b *BoltDBVersioned) Delete(version uint64, ns string, key []byte) error { if !b.db.IsReady() { return ErrDBNotStarted } + if _, ok := b.vns[ns]; !ok { + return b.db.Delete(ns, key) + } // check key's metadata if err := b.checkNamespaceAndKey(ns, key); err != nil { return err @@ -207,6 +218,9 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) { if !b.db.IsReady() { return 0, ErrDBNotStarted } + if _, ok := b.vns[ns]; !ok { + return 0, errors.Errorf("namespace %s is non-versioned", ns) + } // check key's metadata if err := b.checkNamespaceAndKey(ns, key); err != nil { return 0, err @@ -219,14 +233,22 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) { return last, err } +// Filter returns <k, v> pair in a bucket that meet the condition +func (b *BoltDBVersioned) Filter(ns string, cond Condition, minKey, maxKey []byte) ([][]byte, [][]byte, error) { + if _, ok := b.vns[ns]; ok { + panic("Filter not supported for versioned DB") + } + return b.db.Filter(ns, cond, minKey, maxKey) +} + // CommitToDB write a batch to DB, where the batch can contain keys for // both versioned and non-versioned namespace -func (b *BoltDBVersioned) CommitToDB(version uint64, vns map[string]bool, kvsb batch.KVStoreBatch) error { - vnsize, ve, nve, err := dedup(vns, kvsb) +func (b *BoltDBVersioned) CommitToDB(version uint64, kvsb batch.KVStoreBatch) error { + ve, nve, err := dedup(b.vns, kvsb) if err != nil { return errors.Wrapf(err, "BoltDBVersioned failed to write batch") } - return b.commitToDB(version, vnsize, ve, nve) + return b.commitToDB(version, b.vns, ve, nve) } func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, nve []*batch.WriteInfo) error { @@ -384,7 +406,7 @@ func writeVersionedEntry(version uint64, bucket *bolt.Bucket, ve *batch.WriteInf // 1. deduplicate entries in the batch, only keep the last write for each key // 2. splits entries into 2 slices according to the input namespace map // 3. return a map of input namespace's keyLength -func dedup(vns map[string]bool, kvsb batch.KVStoreBatch) (map[string]int, []*batch.WriteInfo, []*batch.WriteInfo, error) { +func dedup(vns map[string]int, kvsb batch.KVStoreBatch) ([]*batch.WriteInfo, []*batch.WriteInfo, error) { kvsb.Lock() defer kvsb.Unlock() @@ -403,7 +425,7 @@ func dedup(vns map[string]bool, kvsb batch.KVStoreBatch) (map[string]int, []*bat for i := kvsb.Size() - 1; i >= 0; i-- { write, e := kvsb.Entry(i) if e != nil { - return nil, nil, nil, e + return nil, nil, e } // only handle Put and Delete var ( @@ -423,23 +445,26 @@ func dedup(vns map[string]bool, kvsb batch.KVStoreBatch) (map[string]int, []*bat // otherwise, the DELETE might return not-exist entryKeySet[k] = true } - if pickAll || vns[k.ns] { - nsInMap = append(nsInMap, write) - } else { - other = append(other, write) - } - // check key size - if pickAll || vns[k.ns] { + if pickAll { if n, ok := nsKeyLen[k.ns]; !ok { nsKeyLen[k.ns] = len(write.Key()) } else { if n != len(write.Key()) { - return nil, nil, nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", n, len(write.Key())) + return nil, nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", n, len(write.Key())) } } + nsInMap = append(nsInMap, write) + } else if keyLen := vns[k.ns]; keyLen > 0 { + // verify key size + if keyLen != len(write.Key()) { + return nil, nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", keyLen, len(write.Key())) + } + nsInMap = append(nsInMap, write) + } else { + other = append(other, write) } } - return nsKeyLen, nsInMap, other, nil + return nsInMap, other, nil } func isNotExist(err error) bool { diff --git a/db/db_versioned_test.go b/db/db_versioned_test.go index 7c62e0e117..deb986109e 100644 --- a/db/db_versioned_test.go +++ b/db/db_versioned_test.go @@ -144,11 +144,13 @@ func TestVersionedDB(t *testing.T) { r.Equal(e.err, errors.Cause(err)) r.Equal(e.height, value) } + v, err := db.Version(_bucket2, _k1) + r.Zero(v) + r.ErrorContains(err, "namespace test_ns2 is non-versioned") // test delete - r.Equal(ErrNotExist, errors.Cause(db.Delete(10, _bucket2, _k1))) for _, k := range [][]byte{_k2, _k4} { r.NoError(db.Delete(11, _bucket1, k)) - r.ErrorContains(db.Delete(10, _bucket1, k), "cannot delete at earlier version 10") + r.ErrorContains(db.Delete(10, _bucket1, k), "cannot delete at earlier version 10: invalid input") } for _, k := range [][]byte{_k1, _k3, _k5} { r.Equal(ErrNotExist, errors.Cause(db.Delete(10, _bucket1, k))) @@ -243,6 +245,8 @@ func TestVersionedDB(t *testing.T) { r.Equal(e.err, errors.Cause(err)) r.Equal(e.height, value) } + // test filter + r.PanicsWithValue("Filter not supported for versioned DB", func() { db.Filter(_bucket1, nil, nil, nil) }) } func TestMultipleWriteDelete(t *testing.T) { @@ -324,7 +328,7 @@ func TestMultipleWriteDelete(t *testing.T) { } else { b.Put(e.ns, e.k, e.v, "test") } - r.NoError(db.CommitToDB(e.height, nil, b)) + r.NoError(db.CommitToDB(e.height, b)) b.Clear() v, err := db.Version(e.ns, e.k) r.Equal(e.err, errors.Cause(err)) @@ -378,11 +382,8 @@ func TestDedup(t *testing.T) { } { b.Put(e.ns, e.k, e.v, "test") } - keySize, ve, ce, err := dedup(nil, b) + ve, ce, err := dedup(nil, b) r.NoError(err) - r.Equal(2, len(keySize)) - r.Equal(5, keySize[_bucket1]) - r.Equal(7, keySize[_bucket2]) r.Equal(8, len(ve)) r.Zero(len(ce)) for i, v := range [][]byte{_k4, _k3, _k2, _k1, _v4, _v3, _v2, _v1} { @@ -391,12 +392,8 @@ func TestDedup(t *testing.T) { // put a key with diff length into _bucket2 b.Put(_bucket2, _k1, _v1, "test") // treat _bucket1 as versioned namespace still OK - keySize, ve, ce, err = dedup(map[string]bool{ - _bucket1: true, - }, b) + ve, ce, err = dedup(map[string]int{_bucket1: 5}, b) r.NoError(err) - r.Equal(1, len(keySize)) - r.Equal(5, keySize[_bucket1]) r.Equal(4, len(ve)) r.Equal(5, len(ce)) for i, v := range [][]byte{_k4, _k3, _k2, _k1} { @@ -406,12 +403,10 @@ func TestDedup(t *testing.T) { r.Equal(v, ce[i].Key()) } // treat _bucket2 (or both buckets) as versioned namespace hits error due to diff key size - for _, v := range []map[string]bool{ - {_bucket2: true}, nil, - } { - _, _, _, err = dedup(v, b) - r.Equal("invalid key length, expecting 5, got 7: invalid input", err.Error()) - } + _, _, err = dedup(map[string]int{_bucket2: 7}, b) + r.ErrorContains(err, "invalid key length, expecting 7, got 5: invalid input") + _, _, err = dedup(nil, b) + r.ErrorContains(err, "invalid key length, expecting 5, got 7: invalid input") } func TestCommitToDB(t *testing.T) { @@ -441,12 +436,12 @@ func TestCommitToDB(t *testing.T) { } { b.Put(e.ns, e.k, e.v, "test") } - r.ErrorContains(db.CommitToDB(1, nil, b), "has not been added") + r.PanicsWithValue("BoltDBVersioned.commitToDB(), vns = test_ns2 does not exist", func() { db.CommitToDB(1, b) }) // create namespace r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k1)))) r.NoError(db.AddVersionedNamespace(_bucket2, uint32(len(_v1)))) - r.NoError(db.CommitToDB(1, nil, b)) + r.NoError(db.CommitToDB(1, b)) b.Clear() for _, e := range []versionTest{ {_bucket2, _v1, nil, 0, ErrNotExist}, @@ -478,7 +473,7 @@ func TestCommitToDB(t *testing.T) { // batch with wrong key length would fail b.Put(_bucket1, _v1, _k1, "test") - r.Equal(ErrInvalid, errors.Cause(db.CommitToDB(3, nil, b))) + r.Equal(ErrInvalid, errors.Cause(db.CommitToDB(3, b))) b.Clear() for _, e := range []versionTest{ {_bucket1, _k1, _v1, 0, nil}, @@ -495,7 +490,7 @@ func TestCommitToDB(t *testing.T) { b.Delete(_bucket1, _k3, "test") b.Delete(_bucket2, _v3, "test") - r.NoError(db.CommitToDB(5, nil, b)) + r.NoError(db.CommitToDB(5, b)) b.Clear() for _, e := range []versionTest{ {_bucket1, _k1, nil, 0, ErrNotExist}, @@ -546,5 +541,5 @@ func TestCommitToDB(t *testing.T) { // cannot write to earlier version b.Put(_bucket1, _k1, _v2, "test") b.Put(_bucket1, _k2, _v1, "test") - r.ErrorIs(db.CommitToDB(4, nil, b), ErrInvalid) + r.ErrorIs(db.CommitToDB(4, b), ErrInvalid) } From 36e7382c18969151c11765f57eb0828db02a5032 Mon Sep 17 00:00:00 2001 From: dustinxie <dahuaxie@gmail.com> Date: Thu, 9 Jan 2025 16:44:11 -0800 Subject: [PATCH 6/9] Get() to return wrap(ErrNotExist, key deleted) --- db/db_versioned.go | 3 + db/db_versioned_test.go | 482 ++++++++++++++++++++++------------------ 2 files changed, 271 insertions(+), 214 deletions(-) diff --git a/db/db_versioned.go b/db/db_versioned.go index a4ff39ff13..86466febf6 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -143,6 +143,9 @@ func (b *BoltDBVersioned) Get(version uint64, ns string, key []byte) ([]byte, er return nil, err } _, v, err := b.get(version, ns, key) + if err == ErrDeleted { + err = errors.Wrapf(ErrNotExist, "key %x deleted", key) + } return v, err } diff --git a/db/db_versioned_test.go b/db/db_versioned_test.go index deb986109e..05a31dde00 100644 --- a/db/db_versioned_test.go +++ b/db/db_versioned_test.go @@ -19,15 +19,17 @@ import ( ) var ( - _k5 = []byte("key_5") - _k10 = []byte("key_10") + _k5 = []byte("key_5") + _k10 = []byte("key_10") + _errNotExist = ErrNotExist.Error() + _errDeleted = "deleted: not exist in DB" ) type versionTest struct { ns string k, v []byte height uint64 - err error + err string } func TestVersionedDB(t *testing.T) { @@ -69,28 +71,32 @@ func TestVersionedDB(t *testing.T) { r.NoError(db.Put(4, _bucket1, _k4, _v1)) r.NoError(db.Put(7, _bucket1, _k4, _v3)) for _, e := range []versionTest{ - {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, _v2, 0, nil}, - {_bucket1, _k2, _v1, 1, nil}, - {_bucket1, _k2, _v1, 2, nil}, - {_bucket1, _k2, _v3, 3, nil}, - {_bucket1, _k2, _v3, 5, nil}, - {_bucket1, _k2, _v2, 6, nil}, - {_bucket1, _k2, _v2, 7, nil}, // after last write version - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version - {_bucket1, _k4, _v2, 2, nil}, - {_bucket1, _k4, _v2, 3, nil}, - {_bucket1, _k4, _v1, 4, nil}, - {_bucket1, _k4, _v1, 6, nil}, - {_bucket1, _k4, _v3, 7, nil}, - {_bucket1, _k4, _v3, 8, nil}, // larger than last key in bucket - {_bucket1, _k5, nil, 0, ErrNotExist}, - {_bucket1, _k10, nil, 0, ErrInvalid}, + {_bucket2, _k1, nil, 0, _errNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, _v2, 0, ""}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k2, _v2, 6, ""}, + {_bucket1, _k2, _v2, 7, ""}, // after last write version + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, ""}, + {_bucket1, _k4, _v2, 3, ""}, + {_bucket1, _k4, _v1, 4, ""}, + {_bucket1, _k4, _v1, 6, ""}, + {_bucket1, _k4, _v3, 7, ""}, + {_bucket1, _k4, _v3, 8, ""}, // larger than last key in bucket + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } // overwrite the same height again @@ -103,45 +109,53 @@ func TestVersionedDB(t *testing.T) { r.NoError(db.Put(9, _bucket1, _k2, _v4)) r.NoError(db.Put(10, _bucket1, _k4, _v4)) for _, e := range []versionTest{ - {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, _v2, 0, nil}, - {_bucket1, _k2, _v1, 1, nil}, - {_bucket1, _k2, _v1, 2, nil}, - {_bucket1, _k2, _v3, 3, nil}, - {_bucket1, _k2, _v3, 5, nil}, - {_bucket1, _k2, _v4, 6, nil}, - {_bucket1, _k2, _v4, 8, nil}, - {_bucket1, _k2, _v4, 9, nil}, - {_bucket1, _k2, _v4, 10, nil}, // after last write version - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version - {_bucket1, _k4, _v2, 2, nil}, - {_bucket1, _k4, _v2, 3, nil}, - {_bucket1, _k4, _v1, 4, nil}, - {_bucket1, _k4, _v1, 6, nil}, - {_bucket1, _k4, _v4, 7, nil}, - {_bucket1, _k4, _v4, 9, nil}, - {_bucket1, _k4, _v4, 10, nil}, - {_bucket1, _k4, _v4, 11, nil}, // larger than last key in bucket - {_bucket1, _k5, nil, 0, ErrNotExist}, - {_bucket1, _k10, nil, 0, ErrInvalid}, + {_bucket2, _k1, nil, 0, _errNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, _v2, 0, ""}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k2, _v4, 6, ""}, + {_bucket1, _k2, _v4, 8, ""}, + {_bucket1, _k2, _v4, 9, ""}, + {_bucket1, _k2, _v4, 10, ""}, // after last write version + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, ""}, + {_bucket1, _k4, _v2, 3, ""}, + {_bucket1, _k4, _v1, 4, ""}, + {_bucket1, _k4, _v1, 6, ""}, + {_bucket1, _k4, _v4, 7, ""}, + {_bucket1, _k4, _v4, 9, ""}, + {_bucket1, _k4, _v4, 10, ""}, + {_bucket1, _k4, _v4, 11, ""}, // larger than last key in bucket + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } // check version for _, e := range []versionTest{ - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, nil, 9, nil}, - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 10, nil}, - {_bucket1, _k5, nil, 0, ErrNotExist}, - {_bucket1, _k10, nil, 0, ErrInvalid}, + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, nil, 9, ""}, + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 10, ""}, + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, } { value, err := db.Version(e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.height, value) } v, err := db.Version(_bucket2, _k1) @@ -158,91 +172,107 @@ func TestVersionedDB(t *testing.T) { r.Equal(ErrInvalid, errors.Cause(db.Delete(10, _bucket1, _k10))) // key still can be read before delete version for _, e := range []versionTest{ - {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, _v2, 0, nil}, - {_bucket1, _k2, _v1, 1, nil}, - {_bucket1, _k2, _v1, 2, nil}, - {_bucket1, _k2, _v3, 3, nil}, - {_bucket1, _k2, _v3, 5, nil}, - {_bucket1, _k2, _v4, 6, nil}, - {_bucket1, _k2, _v4, 8, nil}, - {_bucket1, _k2, _v4, 9, nil}, - {_bucket1, _k2, _v4, 10, nil}, // before delete version - {_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version - {_bucket1, _k4, _v2, 2, nil}, - {_bucket1, _k4, _v2, 3, nil}, - {_bucket1, _k4, _v1, 4, nil}, - {_bucket1, _k4, _v1, 6, nil}, - {_bucket1, _k4, _v4, 7, nil}, - {_bucket1, _k4, _v4, 10, nil}, // before delete version - {_bucket1, _k4, nil, 11, ErrDeleted}, // after delete version - {_bucket1, _k5, nil, 0, ErrNotExist}, - {_bucket1, _k10, nil, 0, ErrInvalid}, + {_bucket2, _k1, nil, 0, _errNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, _v2, 0, ""}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k2, _v4, 6, ""}, + {_bucket1, _k2, _v4, 8, ""}, + {_bucket1, _k2, _v4, 9, ""}, + {_bucket1, _k2, _v4, 10, ""}, // before delete version + {_bucket1, _k2, nil, 11, _errDeleted}, // after delete version + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, ""}, + {_bucket1, _k4, _v2, 3, ""}, + {_bucket1, _k4, _v1, 4, ""}, + {_bucket1, _k4, _v1, 6, ""}, + {_bucket1, _k4, _v4, 7, ""}, + {_bucket1, _k4, _v4, 10, ""}, // before delete version + {_bucket1, _k4, nil, 11, _errDeleted}, // after delete version + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } // write before delete version is invalid r.ErrorContains(db.Put(9, _bucket1, _k2, _k2), "cannot write at earlier version 9: invalid input") r.ErrorContains(db.Put(9, _bucket1, _k4, _k4), "cannot write at earlier version 9: invalid input") for _, e := range []versionTest{ - {_bucket1, _k2, _v4, 10, nil}, // before delete version - {_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version - {_bucket1, _k4, _v4, 10, nil}, // before delete version - {_bucket1, _k4, nil, 11, ErrDeleted}, // after delete version + {_bucket1, _k2, _v4, 10, ""}, // before delete version + {_bucket1, _k2, nil, 11, _errDeleted}, // after delete version + {_bucket1, _k4, _v4, 10, ""}, // before delete version + {_bucket1, _k4, nil, 11, _errDeleted}, // after delete version } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } // write after delete version r.NoError(db.Put(12, _bucket1, _k2, _k2)) r.NoError(db.Put(12, _bucket1, _k4, _k4)) for _, e := range []versionTest{ - {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, _v2, 0, nil}, - {_bucket1, _k2, _v1, 1, nil}, - {_bucket1, _k2, _v1, 2, nil}, - {_bucket1, _k2, _v3, 3, nil}, - {_bucket1, _k2, _v3, 5, nil}, - {_bucket1, _k2, _v4, 6, nil}, - {_bucket1, _k2, _v4, 8, nil}, - {_bucket1, _k2, _v4, 10, nil}, // before delete version - {_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version - {_bucket1, _k2, _k2, 12, nil}, // after next write version - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version - {_bucket1, _k4, _v2, 2, nil}, - {_bucket1, _k4, _v2, 3, nil}, - {_bucket1, _k4, _v1, 4, nil}, - {_bucket1, _k4, _v1, 6, nil}, - {_bucket1, _k4, _v4, 7, nil}, - {_bucket1, _k4, _v4, 10, nil}, // before delete version - {_bucket1, _k4, nil, 11, ErrDeleted}, // after delete version - {_bucket1, _k4, _k4, 12, nil}, // after next write version - {_bucket1, _k5, nil, 0, ErrNotExist}, - {_bucket1, _k10, nil, 0, ErrInvalid}, + {_bucket2, _k1, nil, 0, _errNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, _v2, 0, ""}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k2, _v4, 6, ""}, + {_bucket1, _k2, _v4, 8, ""}, + {_bucket1, _k2, _v4, 10, ""}, // before delete version + {_bucket1, _k2, nil, 11, _errDeleted}, // after delete version + {_bucket1, _k2, _k2, 12, ""}, // after next write version + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, ""}, + {_bucket1, _k4, _v2, 3, ""}, + {_bucket1, _k4, _v1, 4, ""}, + {_bucket1, _k4, _v1, 6, ""}, + {_bucket1, _k4, _v4, 7, ""}, + {_bucket1, _k4, _v4, 10, ""}, // before delete version + {_bucket1, _k4, nil, 11, _errDeleted}, // after delete version + {_bucket1, _k4, _k4, 12, ""}, // after next write version + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } // check version after delete for _, e := range []versionTest{ - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, nil, 12, nil}, - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 12, nil}, - {_bucket1, _k5, nil, 0, ErrNotExist}, - {_bucket1, _k10, nil, 0, ErrInvalid}, + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, nil, 12, ""}, + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 12, ""}, + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, } { value, err := db.Version(e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.height, value) } // test filter @@ -304,14 +334,14 @@ func TestMultipleWriteDelete(t *testing.T) { // multiple writes and deletes using commitToDB b := batch.NewBatch() for _, e := range []versionTest{ - {_bucket1, _k2, _v1, 1, nil}, - {_bucket1, _k2, _v3, 3, nil}, - {_bucket1, _k2, nil, 7, ErrDeleted}, - {_bucket1, _k2, _v2, 10, nil}, - {_bucket1, _k2, nil, 15, ErrDeleted}, - {_bucket1, _k2, _v3, 18, ErrDeleted}, // delete-after-write - {_bucket1, _k2, _v4, 21, nil}, - {_bucket1, _k2, _k2, 25, nil}, // write-after-delete + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, nil, 7, ErrDeleted.Error()}, + {_bucket1, _k2, _v2, 10, ""}, + {_bucket1, _k2, nil, 15, ErrDeleted.Error()}, + {_bucket1, _k2, _v3, 18, ErrDeleted.Error()}, // delete-after-write + {_bucket1, _k2, _v4, 21, ""}, + {_bucket1, _k2, _k2, 25, ""}, // write-after-delete } { if e.height == 7 || e.height == 15 { b.Delete(e.ns, e.k, "test") @@ -331,35 +361,43 @@ func TestMultipleWriteDelete(t *testing.T) { r.NoError(db.CommitToDB(e.height, b)) b.Clear() v, err := db.Version(e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } if err == nil { r.EqualValues(e.height, v) } } } for _, e := range []versionTest{ - {_bucket1, _k2, nil, 0, ErrNotExist}, - {_bucket1, _k2, _v1, 1, nil}, - {_bucket1, _k2, _v1, 2, nil}, - {_bucket1, _k2, _v3, 3, nil}, - {_bucket1, _k2, _v3, 6, nil}, - {_bucket1, _k2, nil, 7, ErrDeleted}, - {_bucket1, _k2, nil, 9, ErrDeleted}, - {_bucket1, _k2, _v2, 10, nil}, - {_bucket1, _k2, _v2, 14, nil}, - {_bucket1, _k2, nil, 15, ErrDeleted}, - {_bucket1, _k2, nil, 17, ErrDeleted}, - {_bucket1, _k2, nil, 18, ErrDeleted}, - {_bucket1, _k2, nil, 20, ErrDeleted}, - {_bucket1, _k2, _v4, 21, nil}, - {_bucket1, _k2, _v4, 22, nil}, - {_bucket1, _k2, _v4, 24, nil}, - {_bucket1, _k2, _k2, 25, nil}, - {_bucket1, _k2, _k2, 26, nil}, - {_bucket1, _k2, _k2, 25000, nil}, + {_bucket1, _k2, nil, 0, _errNotExist}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 6, ""}, + {_bucket1, _k2, nil, 7, _errDeleted}, + {_bucket1, _k2, nil, 9, _errDeleted}, + {_bucket1, _k2, _v2, 10, ""}, + {_bucket1, _k2, _v2, 14, ""}, + {_bucket1, _k2, nil, 15, _errDeleted}, + {_bucket1, _k2, nil, 17, _errDeleted}, + {_bucket1, _k2, nil, 18, _errDeleted}, + {_bucket1, _k2, nil, 20, _errDeleted}, + {_bucket1, _k2, _v4, 21, ""}, + {_bucket1, _k2, _v4, 22, ""}, + {_bucket1, _k2, _v4, 24, ""}, + {_bucket1, _k2, _k2, 25, ""}, + {_bucket1, _k2, _k2, 26, ""}, + {_bucket1, _k2, _k2, 25000, ""}, } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } r.NoError(db.Stop(ctx)) @@ -371,14 +409,14 @@ func TestDedup(t *testing.T) { b := batch.NewBatch() for _, e := range []versionTest{ - {_bucket2, _v1, _v2, 0, nil}, - {_bucket2, _v2, _v3, 9, nil}, - {_bucket2, _v3, _v4, 3, nil}, - {_bucket2, _v4, _v1, 1, nil}, - {_bucket1, _k1, _v1, 0, nil}, - {_bucket1, _k2, _v2, 9, nil}, - {_bucket1, _k3, _v3, 3, nil}, - {_bucket1, _k4, _v4, 1, nil}, + {_bucket2, _v1, _v2, 0, ""}, + {_bucket2, _v2, _v3, 9, ""}, + {_bucket2, _v3, _v4, 3, ""}, + {_bucket2, _v4, _v1, 1, ""}, + {_bucket1, _k1, _v1, 0, ""}, + {_bucket1, _k2, _v2, 9, ""}, + {_bucket1, _k3, _v3, 3, ""}, + {_bucket1, _k4, _v4, 1, ""}, } { b.Put(e.ns, e.k, e.v, "test") } @@ -428,11 +466,11 @@ func TestCommitToDB(t *testing.T) { b := batch.NewBatch() for _, e := range []versionTest{ - {_bucket2, _v1, _k1, 0, nil}, - {_bucket2, _v2, _k2, 9, nil}, - {_bucket2, _v3, _k3, 3, nil}, - {_bucket1, _k1, _v1, 0, nil}, - {_bucket1, _k2, _v2, 9, nil}, + {_bucket2, _v1, _k1, 0, ""}, + {_bucket2, _v2, _k2, 9, ""}, + {_bucket2, _v3, _k3, 3, ""}, + {_bucket1, _k1, _v1, 0, ""}, + {_bucket1, _k2, _v2, 9, ""}, } { b.Put(e.ns, e.k, e.v, "test") } @@ -444,30 +482,34 @@ func TestCommitToDB(t *testing.T) { r.NoError(db.CommitToDB(1, b)) b.Clear() for _, e := range []versionTest{ - {_bucket2, _v1, nil, 0, ErrNotExist}, - {_bucket2, _v2, nil, 0, ErrNotExist}, - {_bucket2, _v3, nil, 0, ErrNotExist}, - {_bucket2, _v4, nil, 0, ErrNotExist}, - {_bucket2, _v1, _k1, 1, nil}, - {_bucket2, _v2, _k2, 1, nil}, - {_bucket2, _v3, _k3, 1, nil}, - {_bucket2, _v1, _k1, 2, nil}, - {_bucket2, _v2, _k2, 2, nil}, - {_bucket2, _v3, _k3, 2, nil}, - {_bucket2, _v4, nil, 2, ErrNotExist}, - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, nil, 0, ErrNotExist}, - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 0, ErrNotExist}, - {_bucket1, _k1, _v1, 1, nil}, - {_bucket1, _k2, _v2, 1, nil}, - {_bucket1, _k1, _v1, 3, nil}, - {_bucket1, _k2, _v2, 3, nil}, - {_bucket1, _k3, nil, 3, ErrNotExist}, - {_bucket1, _k4, nil, 3, ErrNotExist}, + {_bucket2, _v1, nil, 0, _errNotExist}, + {_bucket2, _v2, nil, 0, _errNotExist}, + {_bucket2, _v3, nil, 0, _errNotExist}, + {_bucket2, _v4, nil, 0, _errNotExist}, + {_bucket2, _v1, _k1, 1, ""}, + {_bucket2, _v2, _k2, 1, ""}, + {_bucket2, _v3, _k3, 1, ""}, + {_bucket2, _v1, _k1, 2, ""}, + {_bucket2, _v2, _k2, 2, ""}, + {_bucket2, _v3, _k3, 2, ""}, + {_bucket2, _v4, nil, 2, _errNotExist}, + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, nil, 0, _errNotExist}, + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 0, _errNotExist}, + {_bucket1, _k1, _v1, 1, ""}, + {_bucket1, _k2, _v2, 1, ""}, + {_bucket1, _k1, _v1, 3, ""}, + {_bucket1, _k2, _v2, 3, ""}, + {_bucket1, _k3, nil, 3, _errNotExist}, + {_bucket1, _k4, nil, 3, _errNotExist}, } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } @@ -476,14 +518,14 @@ func TestCommitToDB(t *testing.T) { r.Equal(ErrInvalid, errors.Cause(db.CommitToDB(3, b))) b.Clear() for _, e := range []versionTest{ - {_bucket1, _k1, _v1, 0, nil}, - {_bucket1, _k2, _v3, 9, nil}, - {_bucket1, _k3, _v1, 3, nil}, - {_bucket1, _k4, _v2, 1, nil}, - {_bucket2, _v1, _k3, 0, nil}, - {_bucket2, _v2, _k2, 9, nil}, - {_bucket2, _v3, _k1, 3, nil}, - {_bucket2, _v4, _k4, 1, nil}, + {_bucket1, _k1, _v1, 0, ""}, + {_bucket1, _k2, _v3, 9, ""}, + {_bucket1, _k3, _v1, 3, ""}, + {_bucket1, _k4, _v2, 1, ""}, + {_bucket2, _v1, _k3, 0, ""}, + {_bucket2, _v2, _k2, 9, ""}, + {_bucket2, _v3, _k1, 3, ""}, + {_bucket2, _v4, _k4, 1, ""}, } { b.Put(e.ns, e.k, e.v, "test") } @@ -493,48 +535,60 @@ func TestCommitToDB(t *testing.T) { r.NoError(db.CommitToDB(5, b)) b.Clear() for _, e := range []versionTest{ - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, nil, 0, ErrNotExist}, - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 0, ErrNotExist}, - {_bucket2, _v1, nil, 0, ErrNotExist}, - {_bucket2, _v2, nil, 0, ErrNotExist}, - {_bucket2, _v3, nil, 0, ErrNotExist}, - {_bucket2, _v4, nil, 0, ErrNotExist}, + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, nil, 0, _errNotExist}, + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 0, _errNotExist}, + {_bucket2, _v1, nil, 0, _errNotExist}, + {_bucket2, _v2, nil, 0, _errNotExist}, + {_bucket2, _v3, nil, 0, _errNotExist}, + {_bucket2, _v4, nil, 0, _errNotExist}, } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } for _, e := range []versionTest{ - {_bucket1, _k1, _v1, 1, nil}, - {_bucket1, _k2, _v2, 1, nil}, - {_bucket1, _k3, nil, 1, ErrNotExist}, - {_bucket1, _k4, nil, 1, ErrNotExist}, - {_bucket2, _v1, _k1, 1, nil}, - {_bucket2, _v2, _k2, 1, nil}, - {_bucket2, _v3, _k3, 1, nil}, - {_bucket2, _v4, nil, 1, ErrNotExist}, + {_bucket1, _k1, _v1, 1, ""}, + {_bucket1, _k2, _v2, 1, ""}, + {_bucket1, _k3, nil, 1, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, + {_bucket2, _v1, _k1, 1, ""}, + {_bucket2, _v2, _k2, 1, ""}, + {_bucket2, _v3, _k3, 1, ""}, + {_bucket2, _v4, nil, 1, _errNotExist}, } { for _, h := range []uint64{1, 2, 3, 4} { value, err := db.Get(h, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } } for _, e := range []versionTest{ - {_bucket1, _k1, _v1, 5, nil}, - {_bucket1, _k2, _v3, 5, nil}, - {_bucket1, _k3, nil, 5, ErrDeleted}, - {_bucket1, _k4, _v2, 5, nil}, - {_bucket2, _v1, _k3, 5, nil}, - {_bucket2, _v2, _k2, 5, nil}, - {_bucket2, _v3, nil, 5, ErrDeleted}, - {_bucket2, _v4, _k4, 5, nil}, + {_bucket1, _k1, _v1, 5, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k3, nil, 5, _errDeleted}, + {_bucket1, _k4, _v2, 5, ""}, + {_bucket2, _v1, _k3, 5, ""}, + {_bucket2, _v2, _k2, 5, ""}, + {_bucket2, _v3, nil, 5, _errDeleted}, + {_bucket2, _v4, _k4, 5, ""}, } { for _, h := range []uint64{5, 16, 64, 3000, math.MaxUint64} { value, err := db.Get(h, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } } From fd96cbfb18adbbe127abd71f9587aeb161987eff Mon Sep 17 00:00:00 2001 From: dustinxie <dahuaxie@gmail.com> Date: Mon, 13 Jan 2025 19:44:07 -0800 Subject: [PATCH 7/9] address comment --- db/db_versioned.go | 89 ++++++++++++++--------------------------- db/db_versioned_test.go | 4 +- 2 files changed, 33 insertions(+), 60 deletions(-) diff --git a/db/db_versioned.go b/db/db_versioned.go index 86466febf6..3ea16ffea0 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -42,7 +42,7 @@ type ( Delete(uint64, string, []byte) error // Filter returns <k, v> pair in a bucket that meet the condition - Filter(string, Condition, []byte, []byte) ([][]byte, [][]byte, error) + Filter(uint64, string, Condition, []byte, []byte) ([][]byte, [][]byte, error) // AddVersionedNamespace adds a versioned namespace AddVersionedNamespace(string, uint32) error @@ -105,16 +105,13 @@ func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) erro if !b.db.IsReady() { return ErrDBNotStarted } - if _, ok := b.vns[ns]; !ok { + keyLen, ok := b.vns[ns] + if !ok { return b.db.Put(ns, key, value) } - // check namespace - vn, err := b.checkNamespace(ns) - if err != nil { - return err - } - if len(key) != int(vn.keyLen) { - return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key)) + // check key length + if len(key) != keyLen { + return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", keyLen, len(key)) } last, _, err := b.get(math.MaxUint64, ns, key) if !isNotExist(err) && version < last { @@ -135,15 +132,16 @@ func (b *BoltDBVersioned) Get(version uint64, ns string, key []byte) ([]byte, er if !b.db.IsReady() { return nil, ErrDBNotStarted } - if _, ok := b.vns[ns]; !ok { + keyLen, ok := b.vns[ns] + if !ok { return b.db.Get(ns, key) } - // check key's metadata - if err := b.checkNamespaceAndKey(ns, key); err != nil { - return nil, err + // check key length + if len(key) != keyLen { + return nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", keyLen, len(key)) } _, v, err := b.get(version, ns, key) - if err == ErrDeleted { + if errors.Cause(err) == ErrDeleted { err = errors.Wrapf(ErrNotExist, "key %x deleted", key) } return v, err @@ -192,16 +190,17 @@ func (b *BoltDBVersioned) Delete(version uint64, ns string, key []byte) error { if !b.db.IsReady() { return ErrDBNotStarted } - if _, ok := b.vns[ns]; !ok { + keyLen, ok := b.vns[ns] + if !ok { return b.db.Delete(ns, key) } - // check key's metadata - if err := b.checkNamespaceAndKey(ns, key); err != nil { - return err + // check key length + if len(key) != keyLen { + return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", keyLen, len(key)) } last, _, err := b.get(math.MaxUint64, ns, key) if isNotExist(err) { - return err + return nil } if version < last { // not allowed to perform delete on an earlier version @@ -221,12 +220,13 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) { if !b.db.IsReady() { return 0, ErrDBNotStarted } - if _, ok := b.vns[ns]; !ok { + keyLen, ok := b.vns[ns] + if !ok { return 0, errors.Errorf("namespace %s is non-versioned", ns) } - // check key's metadata - if err := b.checkNamespaceAndKey(ns, key); err != nil { - return 0, err + // check key length + if len(key) != keyLen { + return 0, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", keyLen, len(key)) } last, _, err := b.get(math.MaxUint64, ns, key) if isNotExist(err) { @@ -237,7 +237,7 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) { } // Filter returns <k, v> pair in a bucket that meet the condition -func (b *BoltDBVersioned) Filter(ns string, cond Condition, minKey, maxKey []byte) ([][]byte, [][]byte, error) { +func (b *BoltDBVersioned) Filter(version uint64, ns string, cond Condition, minKey, maxKey []byte) ([][]byte, [][]byte, error) { if _, ok := b.vns[ns]; ok { panic("Filter not supported for versioned DB") } @@ -251,10 +251,10 @@ func (b *BoltDBVersioned) CommitToDB(version uint64, kvsb batch.KVStoreBatch) er if err != nil { return errors.Wrapf(err, "BoltDBVersioned failed to write batch") } - return b.commitToDB(version, b.vns, ve, nve) + return b.commitToDB(version, ve, nve) } -func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, nve []*batch.WriteInfo) error { +func (b *BoltDBVersioned) commitToDB(version uint64, ve, nve []*batch.WriteInfo) error { var ( err error nonDBErr bool @@ -263,30 +263,14 @@ func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, buckets := make(map[string]*bolt.Bucket) if err = b.db.db.Update(func(tx *bolt.Tx) error { // create/check metadata of all namespaces - for ns, size := range vnsize { - bucket, ok := buckets[ns] - if !ok { - bucket, err = tx.CreateBucketIfNotExists([]byte(ns)) + for ns := range b.vns { + if _, ok := buckets[ns]; !ok { + bucket, err := tx.CreateBucketIfNotExists([]byte(ns)) if err != nil { return errors.Wrapf(err, "failed to create bucket %s", ns) } buckets[ns] = bucket } - val := bucket.Get(_minKey) - if val == nil { - // namespace not created yet - nonDBErr = true - return errors.Wrapf(ErrInvalid, "namespace %s has not been added", ns) - } - vn, err := deserializeVersionedNamespace(val) - if err != nil { - nonDBErr = true - return errors.Wrapf(err, "failed to get metadata of bucket %s", ns) - } - if vn.keyLen != uint32(size) { - nonDBErr = true - return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, size) - } } // keep order of the writes same as the original batch for i := len(ve) - 1; i >= 0; i-- { @@ -301,8 +285,8 @@ func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), vns = %s does not exist", ns)) } // wrong-size key should be caught in dedup(), but check anyway - if vnsize[ns] != len(key) { - panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key))) + if b.vns[ns] != len(key) { + panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, b.vns[ns], len(key))) } nonDBErr, err = writeVersionedEntry(version, bucket, write) if err != nil { @@ -500,14 +484,3 @@ func (b *BoltDBVersioned) checkNamespace(ns string) (*versionedNamespace, error) } return deserializeVersionedNamespace(data) } - -func (b *BoltDBVersioned) checkNamespaceAndKey(ns string, key []byte) error { - vn, err := b.checkNamespace(ns) - if err != nil { - return err - } - if len(key) != int(vn.keyLen) { - return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key)) - } - return nil -} diff --git a/db/db_versioned_test.go b/db/db_versioned_test.go index 05a31dde00..8cb9091853 100644 --- a/db/db_versioned_test.go +++ b/db/db_versioned_test.go @@ -167,7 +167,7 @@ func TestVersionedDB(t *testing.T) { r.ErrorContains(db.Delete(10, _bucket1, k), "cannot delete at earlier version 10: invalid input") } for _, k := range [][]byte{_k1, _k3, _k5} { - r.Equal(ErrNotExist, errors.Cause(db.Delete(10, _bucket1, k))) + r.NoError(db.Delete(10, _bucket1, k)) } r.Equal(ErrInvalid, errors.Cause(db.Delete(10, _bucket1, _k10))) // key still can be read before delete version @@ -276,7 +276,7 @@ func TestVersionedDB(t *testing.T) { r.Equal(e.height, value) } // test filter - r.PanicsWithValue("Filter not supported for versioned DB", func() { db.Filter(_bucket1, nil, nil, nil) }) + r.PanicsWithValue("Filter not supported for versioned DB", func() { db.Filter(0, _bucket1, nil, nil, nil) }) } func TestMultipleWriteDelete(t *testing.T) { From bb02385f43807575ac8115f3ef7bc600a282dd32 Mon Sep 17 00:00:00 2001 From: dustinxie <dahuaxie@gmail.com> Date: Tue, 14 Jan 2025 17:03:01 -0800 Subject: [PATCH 8/9] change AddVersionedNamespace() to DB init option --- db/db_versioned.go | 63 +++++++++++++++++++++++++++-------------- db/db_versioned_test.go | 23 +++++---------- 2 files changed, 48 insertions(+), 38 deletions(-) diff --git a/db/db_versioned.go b/db/db_versioned.go index 3ea16ffea0..bbe96d1a98 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -44,9 +44,6 @@ type ( // Filter returns <k, v> pair in a bucket that meet the condition Filter(uint64, string, Condition, []byte, []byte) ([][]byte, [][]byte, error) - // AddVersionedNamespace adds a versioned namespace - AddVersionedNamespace(string, uint32) error - // Version returns the key's most recent version Version(string, []byte) (uint64, error) } @@ -56,20 +53,43 @@ type ( db *BoltDB vns map[string]int // map of versioned namespace } + + // Namespace specifies the name and key length of the versioned namespace + Namespace struct { + ns string + keyLen uint32 + } ) +// BoltDBVersionedOption sets option for BoltDBVersioned +type BoltDBVersionedOption func(*BoltDBVersioned) + +func VnsOption(ns ...Namespace) BoltDBVersionedOption { + return func(k *BoltDBVersioned) { + for _, v := range ns { + k.vns[v.ns] = int(v.keyLen) + } + } +} + // NewBoltDBVersioned instantiates an BoltDB which implements VersionedDB -func NewBoltDBVersioned(cfg Config) *BoltDBVersioned { +func NewBoltDBVersioned(cfg Config, opts ...BoltDBVersionedOption) *BoltDBVersioned { b := BoltDBVersioned{ db: NewBoltDB(cfg), vns: make(map[string]int), } + for _, opt := range opts { + opt(&b) + } return &b } // Start starts the DB func (b *BoltDBVersioned) Start(ctx context.Context) error { - return b.db.Start(ctx) + if err := b.db.Start(ctx); err != nil { + return err + } + return b.addVersionedNamespace() } // Stop stops the DB @@ -77,26 +97,25 @@ func (b *BoltDBVersioned) Stop(ctx context.Context) error { return b.db.Stop(ctx) } -// AddVersionedNamespace adds a versioned namespace -func (b *BoltDBVersioned) AddVersionedNamespace(ns string, keyLen uint32) error { - vn, err := b.checkNamespace(ns) - if cause := errors.Cause(err); cause == ErrNotExist || cause == ErrBucketNotExist { - // create metadata for namespace - if err = b.db.Put(ns, _minKey, (&versionedNamespace{ - keyLen: keyLen, - }).serialize()); err != nil { +func (b *BoltDBVersioned) addVersionedNamespace() error { + for ns, keyLen := range b.vns { + vn, err := b.checkNamespace(ns) + if cause := errors.Cause(err); cause == ErrNotExist || cause == ErrBucketNotExist { + // create metadata for namespace + if err = b.db.Put(ns, _minKey, (&versionedNamespace{ + keyLen: uint32(keyLen), + }).serialize()); err != nil { + return err + } + continue + } + if err != nil { return err } - b.vns[ns] = int(keyLen) - return nil - } - if err != nil { - return err - } - if vn.keyLen != keyLen { - return errors.Wrapf(ErrInvalid, "namespace %s already exists with key length = %d, got %d", ns, vn.keyLen, keyLen) + if vn.keyLen != uint32(keyLen) { + return errors.Wrapf(ErrInvalid, "namespace %s already exists with key length = %d, got %d", ns, vn.keyLen, keyLen) + } } - b.vns[ns] = int(keyLen) return nil } diff --git a/db/db_versioned_test.go b/db/db_versioned_test.go index 8cb9091853..733e9ceefe 100644 --- a/db/db_versioned_test.go +++ b/db/db_versioned_test.go @@ -42,20 +42,17 @@ func TestVersionedDB(t *testing.T) { cfg := DefaultConfig cfg.DbPath = testPath - db := NewBoltDBVersioned(cfg) + db := NewBoltDBVersioned(cfg, VnsOption(Namespace{_bucket1, uint32(len(_k2))})) ctx := context.Background() r.NoError(db.Start(ctx)) defer func() { db.Stop(ctx) }() - // namespace and key does not exist + // namespace created vn, err := db.checkNamespace(_bucket1) - r.Nil(vn) - r.ErrorContains(err, ErrNotExist.Error()) - // create namespace - r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k2)))) - r.ErrorContains(db.AddVersionedNamespace(_bucket1, 8), "namespace test_ns1 already exists with key length = 5, got 8") + r.NoError(err) + r.Equal(uint32(len(_k2)), vn.keyLen) // write first key r.NoError(db.Put(0, _bucket1, _k2, _v2)) vn, err = db.checkNamespace(_bucket1) @@ -290,12 +287,10 @@ func TestMultipleWriteDelete(t *testing.T) { cfg := DefaultConfig cfg.DbPath = testPath - db := NewBoltDBVersioned(cfg) + db := NewBoltDBVersioned(cfg, VnsOption(Namespace{_bucket1, uint32(len(_k2))})) ctx := context.Background() r.NoError(db.Start(ctx)) - // create namespace - r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k2)))) if i == 0 { // multiple writes and deletes r.NoError(db.Put(1, _bucket1, _k2, _v1)) @@ -457,7 +452,8 @@ func TestCommitToDB(t *testing.T) { cfg := DefaultConfig cfg.DbPath = testPath - db := NewBoltDBVersioned(cfg) + db := NewBoltDBVersioned(cfg, VnsOption( + Namespace{_bucket1, uint32(len(_k1))}, Namespace{_bucket2, uint32(len(_v1))})) ctx := context.Background() r.NoError(db.Start(ctx)) defer func() { @@ -474,11 +470,6 @@ func TestCommitToDB(t *testing.T) { } { b.Put(e.ns, e.k, e.v, "test") } - r.PanicsWithValue("BoltDBVersioned.commitToDB(), vns = test_ns2 does not exist", func() { db.CommitToDB(1, b) }) - - // create namespace - r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k1)))) - r.NoError(db.AddVersionedNamespace(_bucket2, uint32(len(_v1)))) r.NoError(db.CommitToDB(1, b)) b.Clear() for _, e := range []versionTest{ From 08add4e6f1c4836c26aa404b4295283566d81c31 Mon Sep 17 00:00:00 2001 From: dustinxie <dahuaxie@gmail.com> Date: Tue, 14 Jan 2025 23:25:01 -0800 Subject: [PATCH 9/9] rename CommitBatch --- db/db_versioned.go | 8 ++++---- db/db_versioned_test.go | 12 ++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/db/db_versioned.go b/db/db_versioned.go index bbe96d1a98..8cf057d1f3 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -263,17 +263,17 @@ func (b *BoltDBVersioned) Filter(version uint64, ns string, cond Condition, minK return b.db.Filter(ns, cond, minKey, maxKey) } -// CommitToDB write a batch to DB, where the batch can contain keys for +// CommitBatch write a batch to DB, where the batch can contain keys for // both versioned and non-versioned namespace -func (b *BoltDBVersioned) CommitToDB(version uint64, kvsb batch.KVStoreBatch) error { +func (b *BoltDBVersioned) CommitBatch(version uint64, kvsb batch.KVStoreBatch) error { ve, nve, err := dedup(b.vns, kvsb) if err != nil { return errors.Wrapf(err, "BoltDBVersioned failed to write batch") } - return b.commitToDB(version, ve, nve) + return b.commitBatch(version, ve, nve) } -func (b *BoltDBVersioned) commitToDB(version uint64, ve, nve []*batch.WriteInfo) error { +func (b *BoltDBVersioned) commitBatch(version uint64, ve, nve []*batch.WriteInfo) error { var ( err error nonDBErr bool diff --git a/db/db_versioned_test.go b/db/db_versioned_test.go index 733e9ceefe..fb99637015 100644 --- a/db/db_versioned_test.go +++ b/db/db_versioned_test.go @@ -353,7 +353,7 @@ func TestMultipleWriteDelete(t *testing.T) { } else { b.Put(e.ns, e.k, e.v, "test") } - r.NoError(db.CommitToDB(e.height, b)) + r.NoError(db.CommitBatch(e.height, b)) b.Clear() v, err := db.Version(e.ns, e.k) if len(e.err) == 0 { @@ -442,7 +442,7 @@ func TestDedup(t *testing.T) { r.ErrorContains(err, "invalid key length, expecting 5, got 7: invalid input") } -func TestCommitToDB(t *testing.T) { +func TestCommitBatch(t *testing.T) { r := require.New(t) testPath, err := testutil.PathOfTempFile("test-version") r.NoError(err) @@ -470,7 +470,7 @@ func TestCommitToDB(t *testing.T) { } { b.Put(e.ns, e.k, e.v, "test") } - r.NoError(db.CommitToDB(1, b)) + r.NoError(db.CommitBatch(1, b)) b.Clear() for _, e := range []versionTest{ {_bucket2, _v1, nil, 0, _errNotExist}, @@ -506,7 +506,7 @@ func TestCommitToDB(t *testing.T) { // batch with wrong key length would fail b.Put(_bucket1, _v1, _k1, "test") - r.Equal(ErrInvalid, errors.Cause(db.CommitToDB(3, b))) + r.Equal(ErrInvalid, errors.Cause(db.CommitBatch(3, b))) b.Clear() for _, e := range []versionTest{ {_bucket1, _k1, _v1, 0, ""}, @@ -523,7 +523,7 @@ func TestCommitToDB(t *testing.T) { b.Delete(_bucket1, _k3, "test") b.Delete(_bucket2, _v3, "test") - r.NoError(db.CommitToDB(5, b)) + r.NoError(db.CommitBatch(5, b)) b.Clear() for _, e := range []versionTest{ {_bucket1, _k1, nil, 0, _errNotExist}, @@ -586,5 +586,5 @@ func TestCommitToDB(t *testing.T) { // cannot write to earlier version b.Put(_bucket1, _k1, _v2, "test") b.Put(_bucket1, _k2, _v1, "test") - r.ErrorIs(db.CommitToDB(4, b), ErrInvalid) + r.ErrorIs(db.CommitBatch(4, b), ErrInvalid) }