diff --git a/db/db_versioned.go b/db/db_versioned.go index 0d356b9b0d..8cf057d1f3 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -11,12 +11,15 @@ import ( "context" "fmt" "math" + "syscall" "github.com/pkg/errors" bolt "go.etcd.io/bbolt" + "go.uber.org/zap" "github.com/iotexproject/iotex-core/v2/db/batch" "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" + "github.com/iotexproject/iotex-core/v2/pkg/log" "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" ) @@ -38,8 +41,8 @@ type ( // Delete deletes a record by (namespace, key) Delete(uint64, string, []byte) error - // Base returns the underlying KVStore - Base() KVStore + // Filter returns pair in a bucket that meet the condition + Filter(uint64, string, Condition, []byte, []byte) ([][]byte, [][]byte, error) // Version returns the key's most recent version Version(string, []byte) (uint64, error) @@ -47,21 +50,46 @@ type ( // BoltDBVersioned is KvVersioned implementation based on bolt DB BoltDBVersioned struct { - db *BoltDB + db *BoltDB + vns map[string]int // map of versioned namespace + } + + // Namespace specifies the name and key length of the versioned namespace + Namespace struct { + ns string + keyLen uint32 } ) +// BoltDBVersionedOption sets option for BoltDBVersioned +type BoltDBVersionedOption func(*BoltDBVersioned) + +func VnsOption(ns ...Namespace) BoltDBVersionedOption { + return func(k *BoltDBVersioned) { + for _, v := range ns { + k.vns[v.ns] = int(v.keyLen) + } + } +} + // NewBoltDBVersioned instantiates an BoltDB which implements VersionedDB -func NewBoltDBVersioned(cfg Config) *BoltDBVersioned { +func NewBoltDBVersioned(cfg Config, opts ...BoltDBVersionedOption) *BoltDBVersioned { b := BoltDBVersioned{ - db: NewBoltDB(cfg), + db: NewBoltDB(cfg), + vns: make(map[string]int), + } + for _, opt := range opts { + opt(&b) } return &b } // Start starts the DB func (b *BoltDBVersioned) Start(ctx context.Context) error { - return b.db.Start(ctx) + if err := b.db.Start(ctx); err != nil { + return err + } + return b.addVersionedNamespace() } // Stop stops the DB @@ -69,9 +97,26 @@ func (b *BoltDBVersioned) Stop(ctx context.Context) error { return b.db.Stop(ctx) } -// Base returns the underlying KVStore -func (b *BoltDBVersioned) Base() KVStore { - return b.db +func (b *BoltDBVersioned) addVersionedNamespace() error { + for ns, keyLen := range b.vns { + vn, err := b.checkNamespace(ns) + if cause := errors.Cause(err); cause == ErrNotExist || cause == ErrBucketNotExist { + // create metadata for namespace + if err = b.db.Put(ns, _minKey, (&versionedNamespace{ + keyLen: uint32(keyLen), + }).serialize()); err != nil { + return err + } + continue + } + if err != nil { + return err + } + if vn.keyLen != uint32(keyLen) { + return errors.Wrapf(ErrInvalid, "namespace %s already exists with key length = %d, got %d", ns, vn.keyLen, keyLen) + } + } + return nil } // Put writes a record @@ -79,28 +124,24 @@ func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) erro if !b.db.IsReady() { return ErrDBNotStarted } - // check namespace - vn, err := b.checkNamespace(ns) - if err != nil { - return err + keyLen, ok := b.vns[ns] + if !ok { + return b.db.Put(ns, key, value) } - buf := batch.NewBatch() - if vn == nil { - // namespace not yet created - buf.Put(ns, _minKey, (&versionedNamespace{ - keyLen: uint32(len(key)), - }).serialize(), "failed to create metadata") - } else { - if len(key) != int(vn.keyLen) { - return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key)) - } - last, _, err := b.get(math.MaxUint64, ns, key) - if !isNotExist(err) && version < last { - // not allowed to perform write on an earlier version - return ErrInvalid - } - buf.Delete(ns, keyForDelete(key, version), fmt.Sprintf("failed to delete key %x", key)) + // check key length + if len(key) != keyLen { + return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", keyLen, len(key)) + } + last, _, err := b.get(math.MaxUint64, ns, key) + if !isNotExist(err) && version < last { + // not allowed to perform write on an earlier version + return errors.Wrapf(ErrInvalid, "cannot write at earlier version %d", version) + } + if version != last { + return b.db.Put(ns, keyForWrite(key, version), value) } + buf := batch.NewBatch() + buf.Delete(ns, keyForDelete(key, version), fmt.Sprintf("failed to delete key %x", key)) buf.Put(ns, keyForWrite(key, version), value, fmt.Sprintf("failed to put key %x", key)) return b.db.WriteBatch(buf) } @@ -110,11 +151,18 @@ func (b *BoltDBVersioned) Get(version uint64, ns string, key []byte) ([]byte, er if !b.db.IsReady() { return nil, ErrDBNotStarted } - // check key's metadata - if err := b.checkNamespaceAndKey(ns, key); err != nil { - return nil, err + keyLen, ok := b.vns[ns] + if !ok { + return b.db.Get(ns, key) + } + // check key length + if len(key) != keyLen { + return nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", keyLen, len(key)) } _, v, err := b.get(version, ns, key) + if errors.Cause(err) == ErrDeleted { + err = errors.Wrapf(ErrNotExist, "key %x deleted", key) + } return v, err } @@ -161,17 +209,24 @@ func (b *BoltDBVersioned) Delete(version uint64, ns string, key []byte) error { if !b.db.IsReady() { return ErrDBNotStarted } - // check key's metadata - if err := b.checkNamespaceAndKey(ns, key); err != nil { - return err + keyLen, ok := b.vns[ns] + if !ok { + return b.db.Delete(ns, key) + } + // check key length + if len(key) != keyLen { + return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", keyLen, len(key)) } last, _, err := b.get(math.MaxUint64, ns, key) if isNotExist(err) { - return err + return nil } if version < last { // not allowed to perform delete on an earlier version - return ErrInvalid + return errors.Wrapf(ErrInvalid, "cannot delete at earlier version %d", version) + } + if version != last { + return b.db.Put(ns, keyForDelete(key, version), nil) } buf := batch.NewBatch() buf.Put(ns, keyForDelete(key, version), nil, fmt.Sprintf("failed to delete key %x", key)) @@ -184,9 +239,13 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) { if !b.db.IsReady() { return 0, ErrDBNotStarted } - // check key's metadata - if err := b.checkNamespaceAndKey(ns, key); err != nil { - return 0, err + keyLen, ok := b.vns[ns] + if !ok { + return 0, errors.Errorf("namespace %s is non-versioned", ns) + } + // check key length + if len(key) != keyLen { + return 0, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", keyLen, len(key)) } last, _, err := b.get(math.MaxUint64, ns, key) if isNotExist(err) { @@ -196,6 +255,224 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) { return last, err } +// Filter returns pair in a bucket that meet the condition +func (b *BoltDBVersioned) Filter(version uint64, ns string, cond Condition, minKey, maxKey []byte) ([][]byte, [][]byte, error) { + if _, ok := b.vns[ns]; ok { + panic("Filter not supported for versioned DB") + } + return b.db.Filter(ns, cond, minKey, maxKey) +} + +// CommitBatch write a batch to DB, where the batch can contain keys for +// both versioned and non-versioned namespace +func (b *BoltDBVersioned) CommitBatch(version uint64, kvsb batch.KVStoreBatch) error { + ve, nve, err := dedup(b.vns, kvsb) + if err != nil { + return errors.Wrapf(err, "BoltDBVersioned failed to write batch") + } + return b.commitBatch(version, ve, nve) +} + +func (b *BoltDBVersioned) commitBatch(version uint64, ve, nve []*batch.WriteInfo) error { + var ( + err error + nonDBErr bool + ) + for c := uint8(0); c < b.db.config.NumRetries; c++ { + buckets := make(map[string]*bolt.Bucket) + if err = b.db.db.Update(func(tx *bolt.Tx) error { + // create/check metadata of all namespaces + for ns := range b.vns { + if _, ok := buckets[ns]; !ok { + bucket, err := tx.CreateBucketIfNotExists([]byte(ns)) + if err != nil { + return errors.Wrapf(err, "failed to create bucket %s", ns) + } + buckets[ns] = bucket + } + } + // keep order of the writes same as the original batch + for i := len(ve) - 1; i >= 0; i-- { + var ( + write = ve[i] + ns = write.Namespace() + key = write.Key() + ) + // get bucket + bucket, ok := buckets[ns] + if !ok { + panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), vns = %s does not exist", ns)) + } + // wrong-size key should be caught in dedup(), but check anyway + if b.vns[ns] != len(key) { + panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, b.vns[ns], len(key))) + } + nonDBErr, err = writeVersionedEntry(version, bucket, write) + if err != nil { + return err + } + } + // write non-versioned keys + for i := len(nve) - 1; i >= 0; i-- { + var ( + write = nve[i] + ns = write.Namespace() + ) + switch write.WriteType() { + case batch.Put: + // get bucket + bucket, ok := buckets[ns] + if !ok { + bucket, err = tx.CreateBucketIfNotExists([]byte(ns)) + if err != nil { + return errors.Wrapf(err, "failed to create bucket %s", ns) + } + buckets[ns] = bucket + } + if err = bucket.Put(write.Key(), write.Value()); err != nil { + return errors.Wrap(err, write.Error()) + } + case batch.Delete: + bucket := tx.Bucket([]byte(ns)) + if bucket == nil { + continue + } + if err = bucket.Delete(write.Key()); err != nil { + return errors.Wrap(err, write.Error()) + } + } + } + return nil + }); err == nil || nonDBErr { + break + } + } + if nonDBErr { + return err + } + if err != nil { + if errors.Is(err, syscall.ENOSPC) { + log.L().Fatal("BoltDBVersioned failed to write batch", zap.Error(err)) + } + return errors.Wrap(ErrIO, err.Error()) + } + return nil +} + +func writeVersionedEntry(version uint64, bucket *bolt.Bucket, ve *batch.WriteInfo) (bool, error) { + var ( + key = ve.Key() + val = ve.Value() + last uint64 + notexist bool + maxKey = keyForWrite(key, math.MaxUint64) + ) + c := bucket.Cursor() + k, _ := c.Seek(maxKey) + if k == nil || bytes.Compare(k, maxKey) == 1 { + k, _ = c.Prev() + if k == nil || bytes.Compare(k, keyForDelete(key, 0)) <= 0 { + // cursor is at the beginning/end of the bucket or smaller than minimum key + notexist = true + } + } + if !notexist { + _, last = parseKey(k) + } + switch ve.WriteType() { + case batch.Put: + if !notexist && version <= last { + // not allowed to perform write on an earlier version + return true, errors.Wrapf(ErrInvalid, "cannot write at earlier version %d", version) + } + if err := bucket.Put(keyForWrite(key, version), val); err != nil { + return false, errors.Wrap(err, ve.Error()) + } + case batch.Delete: + if notexist { + return false, nil + } + if version < last { + // not allowed to perform delete on an earlier version + return true, errors.Wrapf(ErrInvalid, "cannot delete at earlier version %d", version) + } + if err := bucket.Put(keyForDelete(key, version), nil); err != nil { + return false, errors.Wrap(err, ve.Error()) + } + if version == last { + if err := bucket.Delete(keyForWrite(key, version)); err != nil { + return false, errors.Wrap(err, ve.Error()) + } + } + } + return false, nil +} + +// dedup does 3 things: +// 1. deduplicate entries in the batch, only keep the last write for each key +// 2. splits entries into 2 slices according to the input namespace map +// 3. return a map of input namespace's keyLength +func dedup(vns map[string]int, kvsb batch.KVStoreBatch) ([]*batch.WriteInfo, []*batch.WriteInfo, error) { + kvsb.Lock() + defer kvsb.Unlock() + + type doubleKey struct { + ns string + key string + } + + var ( + entryKeySet = make(map[doubleKey]bool) + nsKeyLen = make(map[string]int) + nsInMap = make([]*batch.WriteInfo, 0) + other = make([]*batch.WriteInfo, 0) + pickAll = len(vns) == 0 + ) + for i := kvsb.Size() - 1; i >= 0; i-- { + write, e := kvsb.Entry(i) + if e != nil { + return nil, nil, e + } + // only handle Put and Delete + var ( + writeType = write.WriteType() + ns = write.Namespace() + key = write.Key() + ) + if writeType != batch.Put && writeType != batch.Delete { + continue + } + k := doubleKey{ns: ns, key: string(key)} + if entryKeySet[k] { + continue + } + if writeType == batch.Put { + // for a later DELETE, we want to capture the earlier PUT + // otherwise, the DELETE might return not-exist + entryKeySet[k] = true + } + if pickAll { + if n, ok := nsKeyLen[k.ns]; !ok { + nsKeyLen[k.ns] = len(write.Key()) + } else { + if n != len(write.Key()) { + return nil, nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", n, len(write.Key())) + } + } + nsInMap = append(nsInMap, write) + } else if keyLen := vns[k.ns]; keyLen > 0 { + // verify key size + if keyLen != len(write.Key()) { + return nil, nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", keyLen, len(write.Key())) + } + nsInMap = append(nsInMap, write) + } else { + other = append(other, write) + } + } + return nsInMap, other, nil +} + func isNotExist(err error) bool { return err == ErrNotExist || err == ErrBucketNotExist } @@ -221,30 +498,8 @@ func parseKey(key []byte) (bool, uint64) { func (b *BoltDBVersioned) checkNamespace(ns string) (*versionedNamespace, error) { data, err := b.db.Get(ns, _minKey) - switch errors.Cause(err) { - case nil: - vn, err := deserializeVersionedNamespace(data) - if err != nil { - return nil, err - } - return vn, nil - case ErrNotExist, ErrBucketNotExist: - return nil, nil - default: - return nil, err - } -} - -func (b *BoltDBVersioned) checkNamespaceAndKey(ns string, key []byte) error { - vn, err := b.checkNamespace(ns) if err != nil { - return err - } - if vn == nil { - return errors.Wrapf(ErrNotExist, "namespace = %x doesn't exist", ns) - } - if len(key) != int(vn.keyLen) { - return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key)) + return nil, err } - return nil + return deserializeVersionedNamespace(data) } diff --git a/db/db_versioned_test.go b/db/db_versioned_test.go index d13e9afec0..fb99637015 100644 --- a/db/db_versioned_test.go +++ b/db/db_versioned_test.go @@ -8,19 +8,28 @@ package db import ( "context" + "math" "testing" "github.com/pkg/errors" "github.com/stretchr/testify/require" + "github.com/iotexproject/iotex-core/v2/db/batch" "github.com/iotexproject/iotex-core/v2/testutil" ) +var ( + _k5 = []byte("key_5") + _k10 = []byte("key_10") + _errNotExist = ErrNotExist.Error() + _errDeleted = "deleted: not exist in DB" +) + type versionTest struct { ns string k, v []byte height uint64 - err error + err string } func TestVersionedDB(t *testing.T) { @@ -33,29 +42,25 @@ func TestVersionedDB(t *testing.T) { cfg := DefaultConfig cfg.DbPath = testPath - db := NewBoltDBVersioned(cfg) + db := NewBoltDBVersioned(cfg, VnsOption(Namespace{_bucket1, uint32(len(_k2))})) ctx := context.Background() r.NoError(db.Start(ctx)) defer func() { db.Stop(ctx) }() - // namespace and key does not exist + // namespace created vn, err := db.checkNamespace(_bucket1) - r.Nil(vn) - r.Nil(err) - // write first key, namespace and key now exist + r.NoError(err) + r.Equal(uint32(len(_k2)), vn.keyLen) + // write first key r.NoError(db.Put(0, _bucket1, _k2, _v2)) vn, err = db.checkNamespace(_bucket1) r.NoError(err) r.EqualValues(len(_k2), vn.keyLen) // check more Put/Get - var ( - _k5 = []byte("key_5") - _k10 = []byte("key_10") - ) err = db.Put(1, _bucket1, _k10, _v1) - r.Equal("invalid key length, expecting 5, got 6: invalid input", err.Error()) + r.ErrorContains(err, "invalid key length, expecting 5, got 6: invalid input") r.NoError(db.Put(1, _bucket1, _k2, _v1)) r.NoError(db.Put(3, _bucket1, _k2, _v3)) r.NoError(db.Put(6, _bucket1, _k2, _v2)) @@ -63,182 +68,381 @@ func TestVersionedDB(t *testing.T) { r.NoError(db.Put(4, _bucket1, _k4, _v1)) r.NoError(db.Put(7, _bucket1, _k4, _v3)) for _, e := range []versionTest{ - {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, _v2, 0, nil}, - {_bucket1, _k2, _v1, 1, nil}, - {_bucket1, _k2, _v1, 2, nil}, - {_bucket1, _k2, _v3, 3, nil}, - {_bucket1, _k2, _v3, 5, nil}, - {_bucket1, _k2, _v2, 6, nil}, - {_bucket1, _k2, _v2, 7, nil}, // after last write version - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version - {_bucket1, _k4, _v2, 2, nil}, - {_bucket1, _k4, _v2, 3, nil}, - {_bucket1, _k4, _v1, 4, nil}, - {_bucket1, _k4, _v1, 6, nil}, - {_bucket1, _k4, _v3, 7, nil}, - {_bucket1, _k4, _v3, 8, nil}, // larger than last key in bucket - {_bucket1, _k5, nil, 0, ErrNotExist}, - {_bucket1, _k10, nil, 0, ErrInvalid}, + {_bucket2, _k1, nil, 0, _errNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, _v2, 0, ""}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k2, _v2, 6, ""}, + {_bucket1, _k2, _v2, 7, ""}, // after last write version + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, ""}, + {_bucket1, _k4, _v2, 3, ""}, + {_bucket1, _k4, _v1, 4, ""}, + {_bucket1, _k4, _v1, 6, ""}, + {_bucket1, _k4, _v3, 7, ""}, + {_bucket1, _k4, _v3, 8, ""}, // larger than last key in bucket + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } // overwrite the same height again r.NoError(db.Put(6, _bucket1, _k2, _v4)) r.NoError(db.Put(7, _bucket1, _k4, _v4)) // write to earlier version again is invalid - r.Equal(ErrInvalid, db.Put(3, _bucket1, _k2, _v4)) - r.Equal(ErrInvalid, db.Put(4, _bucket1, _k4, _v4)) + r.ErrorContains(db.Put(3, _bucket1, _k2, _v4), "cannot write at earlier version 3: invalid input") + r.ErrorContains(db.Put(4, _bucket1, _k4, _v4), "cannot write at earlier version 4: invalid input") // write with same value r.NoError(db.Put(9, _bucket1, _k2, _v4)) r.NoError(db.Put(10, _bucket1, _k4, _v4)) for _, e := range []versionTest{ - {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, _v2, 0, nil}, - {_bucket1, _k2, _v1, 1, nil}, - {_bucket1, _k2, _v1, 2, nil}, - {_bucket1, _k2, _v3, 3, nil}, - {_bucket1, _k2, _v3, 5, nil}, - {_bucket1, _k2, _v4, 6, nil}, - {_bucket1, _k2, _v4, 8, nil}, - {_bucket1, _k2, _v4, 9, nil}, - {_bucket1, _k2, _v4, 10, nil}, // after last write version - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version - {_bucket1, _k4, _v2, 2, nil}, - {_bucket1, _k4, _v2, 3, nil}, - {_bucket1, _k4, _v1, 4, nil}, - {_bucket1, _k4, _v1, 6, nil}, - {_bucket1, _k4, _v4, 7, nil}, - {_bucket1, _k4, _v4, 9, nil}, - {_bucket1, _k4, _v4, 10, nil}, - {_bucket1, _k4, _v4, 11, nil}, // larger than last key in bucket - {_bucket1, _k5, nil, 0, ErrNotExist}, - {_bucket1, _k10, nil, 0, ErrInvalid}, + {_bucket2, _k1, nil, 0, _errNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, _v2, 0, ""}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k2, _v4, 6, ""}, + {_bucket1, _k2, _v4, 8, ""}, + {_bucket1, _k2, _v4, 9, ""}, + {_bucket1, _k2, _v4, 10, ""}, // after last write version + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, ""}, + {_bucket1, _k4, _v2, 3, ""}, + {_bucket1, _k4, _v1, 4, ""}, + {_bucket1, _k4, _v1, 6, ""}, + {_bucket1, _k4, _v4, 7, ""}, + {_bucket1, _k4, _v4, 9, ""}, + {_bucket1, _k4, _v4, 10, ""}, + {_bucket1, _k4, _v4, 11, ""}, // larger than last key in bucket + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } // check version for _, e := range []versionTest{ - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, nil, 9, nil}, - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 10, nil}, - {_bucket1, _k5, nil, 0, ErrNotExist}, - {_bucket1, _k10, nil, 0, ErrInvalid}, + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, nil, 9, ""}, + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 10, ""}, + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, } { value, err := db.Version(e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.height, value) } + v, err := db.Version(_bucket2, _k1) + r.Zero(v) + r.ErrorContains(err, "namespace test_ns2 is non-versioned") // test delete - r.Equal(ErrNotExist, errors.Cause(db.Delete(10, _bucket2, _k1))) for _, k := range [][]byte{_k2, _k4} { r.NoError(db.Delete(11, _bucket1, k)) + r.ErrorContains(db.Delete(10, _bucket1, k), "cannot delete at earlier version 10: invalid input") } for _, k := range [][]byte{_k1, _k3, _k5} { - r.Equal(ErrNotExist, errors.Cause(db.Delete(10, _bucket1, k))) + r.NoError(db.Delete(10, _bucket1, k)) } r.Equal(ErrInvalid, errors.Cause(db.Delete(10, _bucket1, _k10))) // key still can be read before delete version for _, e := range []versionTest{ - {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, _v2, 0, nil}, - {_bucket1, _k2, _v1, 1, nil}, - {_bucket1, _k2, _v1, 2, nil}, - {_bucket1, _k2, _v3, 3, nil}, - {_bucket1, _k2, _v3, 5, nil}, - {_bucket1, _k2, _v4, 6, nil}, - {_bucket1, _k2, _v4, 8, nil}, - {_bucket1, _k2, _v4, 9, nil}, - {_bucket1, _k2, _v4, 10, nil}, // before delete version - {_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version - {_bucket1, _k4, _v2, 2, nil}, - {_bucket1, _k4, _v2, 3, nil}, - {_bucket1, _k4, _v1, 4, nil}, - {_bucket1, _k4, _v1, 6, nil}, - {_bucket1, _k4, _v4, 7, nil}, - {_bucket1, _k4, _v4, 10, nil}, // before delete version - {_bucket1, _k4, nil, 11, ErrDeleted}, // after delete version - {_bucket1, _k5, nil, 0, ErrNotExist}, - {_bucket1, _k10, nil, 0, ErrInvalid}, + {_bucket2, _k1, nil, 0, _errNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, _v2, 0, ""}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k2, _v4, 6, ""}, + {_bucket1, _k2, _v4, 8, ""}, + {_bucket1, _k2, _v4, 9, ""}, + {_bucket1, _k2, _v4, 10, ""}, // before delete version + {_bucket1, _k2, nil, 11, _errDeleted}, // after delete version + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, ""}, + {_bucket1, _k4, _v2, 3, ""}, + {_bucket1, _k4, _v1, 4, ""}, + {_bucket1, _k4, _v1, 6, ""}, + {_bucket1, _k4, _v4, 7, ""}, + {_bucket1, _k4, _v4, 10, ""}, // before delete version + {_bucket1, _k4, nil, 11, _errDeleted}, // after delete version + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } // write before delete version is invalid - r.Equal(ErrInvalid, db.Put(9, _bucket1, _k2, _k2)) - r.Equal(ErrInvalid, db.Put(9, _bucket1, _k4, _k4)) + r.ErrorContains(db.Put(9, _bucket1, _k2, _k2), "cannot write at earlier version 9: invalid input") + r.ErrorContains(db.Put(9, _bucket1, _k4, _k4), "cannot write at earlier version 9: invalid input") for _, e := range []versionTest{ - {_bucket1, _k2, _v4, 10, nil}, // before delete version - {_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version - {_bucket1, _k4, _v4, 10, nil}, // before delete version - {_bucket1, _k4, nil, 11, ErrDeleted}, // after delete version + {_bucket1, _k2, _v4, 10, ""}, // before delete version + {_bucket1, _k2, nil, 11, _errDeleted}, // after delete version + {_bucket1, _k4, _v4, 10, ""}, // before delete version + {_bucket1, _k4, nil, 11, _errDeleted}, // after delete version } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } // write after delete version r.NoError(db.Put(12, _bucket1, _k2, _k2)) r.NoError(db.Put(12, _bucket1, _k4, _k4)) for _, e := range []versionTest{ - {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, _v2, 0, nil}, - {_bucket1, _k2, _v1, 1, nil}, - {_bucket1, _k2, _v1, 2, nil}, - {_bucket1, _k2, _v3, 3, nil}, - {_bucket1, _k2, _v3, 5, nil}, - {_bucket1, _k2, _v4, 6, nil}, - {_bucket1, _k2, _v4, 8, nil}, - {_bucket1, _k2, _v4, 10, nil}, // before delete version - {_bucket1, _k2, nil, 11, ErrDeleted}, // after delete version - {_bucket1, _k2, _k2, 12, nil}, // after next write version - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version - {_bucket1, _k4, _v2, 2, nil}, - {_bucket1, _k4, _v2, 3, nil}, - {_bucket1, _k4, _v1, 4, nil}, - {_bucket1, _k4, _v1, 6, nil}, - {_bucket1, _k4, _v4, 7, nil}, - {_bucket1, _k4, _v4, 10, nil}, // before delete version - {_bucket1, _k4, nil, 11, ErrDeleted}, // after delete version - {_bucket1, _k4, _k4, 12, nil}, // after next write version - {_bucket1, _k5, nil, 0, ErrNotExist}, - {_bucket1, _k10, nil, 0, ErrInvalid}, + {_bucket2, _k1, nil, 0, _errNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, _v2, 0, ""}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k2, _v4, 6, ""}, + {_bucket1, _k2, _v4, 8, ""}, + {_bucket1, _k2, _v4, 10, ""}, // before delete version + {_bucket1, _k2, nil, 11, _errDeleted}, // after delete version + {_bucket1, _k2, _k2, 12, ""}, // after next write version + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, ""}, + {_bucket1, _k4, _v2, 3, ""}, + {_bucket1, _k4, _v1, 4, ""}, + {_bucket1, _k4, _v1, 6, ""}, + {_bucket1, _k4, _v4, 7, ""}, + {_bucket1, _k4, _v4, 10, ""}, // before delete version + {_bucket1, _k4, nil, 11, _errDeleted}, // after delete version + {_bucket1, _k4, _k4, 12, ""}, // after next write version + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } // check version after delete for _, e := range []versionTest{ - {_bucket1, _k1, nil, 0, ErrNotExist}, - {_bucket1, _k2, nil, 12, nil}, - {_bucket1, _k3, nil, 0, ErrNotExist}, - {_bucket1, _k4, nil, 12, nil}, - {_bucket1, _k5, nil, 0, ErrNotExist}, - {_bucket1, _k10, nil, 0, ErrInvalid}, + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, nil, 12, ""}, + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 12, ""}, + {_bucket1, _k5, nil, 0, _errNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid.Error()}, } { value, err := db.Version(e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.height, value) } + // test filter + r.PanicsWithValue("Filter not supported for versioned DB", func() { db.Filter(0, _bucket1, nil, nil, nil) }) } func TestMultipleWriteDelete(t *testing.T) { + r := require.New(t) + for i := 0; i < 2; i++ { + testPath, err := testutil.PathOfTempFile("test-version") + r.NoError(err) + defer func() { + testutil.CleanupPath(testPath) + }() + + cfg := DefaultConfig + cfg.DbPath = testPath + db := NewBoltDBVersioned(cfg, VnsOption(Namespace{_bucket1, uint32(len(_k2))})) + ctx := context.Background() + r.NoError(db.Start(ctx)) + + if i == 0 { + // multiple writes and deletes + r.NoError(db.Put(1, _bucket1, _k2, _v1)) + r.NoError(db.Put(3, _bucket1, _k2, _v3)) + v, err := db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(3, v) + r.NoError(db.Delete(7, _bucket1, _k2)) + _, err = db.Version(_bucket1, _k2) + r.Equal(ErrDeleted, errors.Cause(err)) + r.NoError(db.Put(10, _bucket1, _k2, _v2)) + v, err = db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(10, v) + r.NoError(db.Delete(15, _bucket1, _k2)) + _, err = db.Version(_bucket1, _k2) + r.Equal(ErrDeleted, errors.Cause(err)) + r.NoError(db.Put(18, _bucket1, _k2, _v3)) + r.NoError(db.Delete(18, _bucket1, _k2)) + r.NoError(db.Put(18, _bucket1, _k2, _v3)) + r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write + _, err = db.Version(_bucket1, _k2) + r.Equal(ErrDeleted, errors.Cause(err)) + r.NoError(db.Put(21, _bucket1, _k2, _v4)) + v, err = db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(21, v) + r.NoError(db.Delete(25, _bucket1, _k2)) + r.NoError(db.Put(25, _bucket1, _k2, _k2)) + r.NoError(db.Delete(25, _bucket1, _k2)) + r.NoError(db.Put(25, _bucket1, _k2, _k2)) // write-after-delete + v, err = db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(25, v) + } else { + // multiple writes and deletes using commitToDB + b := batch.NewBatch() + for _, e := range []versionTest{ + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, nil, 7, ErrDeleted.Error()}, + {_bucket1, _k2, _v2, 10, ""}, + {_bucket1, _k2, nil, 15, ErrDeleted.Error()}, + {_bucket1, _k2, _v3, 18, ErrDeleted.Error()}, // delete-after-write + {_bucket1, _k2, _v4, 21, ""}, + {_bucket1, _k2, _k2, 25, ""}, // write-after-delete + } { + if e.height == 7 || e.height == 15 { + b.Delete(e.ns, e.k, "test") + } else if e.height == 18 { + b.Put(e.ns, e.k, e.v, "test") + b.Delete(e.ns, e.k, "test") + b.Put(e.ns, e.k, e.v, "test") + b.Delete(e.ns, e.k, "test") + } else if e.height == 25 { + b.Delete(e.ns, e.k, "test") + b.Put(e.ns, e.k, e.v, "test") + b.Delete(e.ns, e.k, "test") + b.Put(e.ns, e.k, e.v, "test") + } else { + b.Put(e.ns, e.k, e.v, "test") + } + r.NoError(db.CommitBatch(e.height, b)) + b.Clear() + v, err := db.Version(e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + if err == nil { + r.EqualValues(e.height, v) + } + } + } + for _, e := range []versionTest{ + {_bucket1, _k2, nil, 0, _errNotExist}, + {_bucket1, _k2, _v1, 1, ""}, + {_bucket1, _k2, _v1, 2, ""}, + {_bucket1, _k2, _v3, 3, ""}, + {_bucket1, _k2, _v3, 6, ""}, + {_bucket1, _k2, nil, 7, _errDeleted}, + {_bucket1, _k2, nil, 9, _errDeleted}, + {_bucket1, _k2, _v2, 10, ""}, + {_bucket1, _k2, _v2, 14, ""}, + {_bucket1, _k2, nil, 15, _errDeleted}, + {_bucket1, _k2, nil, 17, _errDeleted}, + {_bucket1, _k2, nil, 18, _errDeleted}, + {_bucket1, _k2, nil, 20, _errDeleted}, + {_bucket1, _k2, _v4, 21, ""}, + {_bucket1, _k2, _v4, 22, ""}, + {_bucket1, _k2, _v4, 24, ""}, + {_bucket1, _k2, _k2, 25, ""}, + {_bucket1, _k2, _k2, 26, ""}, + {_bucket1, _k2, _k2, 25000, ""}, + } { + value, err := db.Get(e.height, e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } + r.NoError(db.Stop(ctx)) + } +} + +func TestDedup(t *testing.T) { + r := require.New(t) + + b := batch.NewBatch() + for _, e := range []versionTest{ + {_bucket2, _v1, _v2, 0, ""}, + {_bucket2, _v2, _v3, 9, ""}, + {_bucket2, _v3, _v4, 3, ""}, + {_bucket2, _v4, _v1, 1, ""}, + {_bucket1, _k1, _v1, 0, ""}, + {_bucket1, _k2, _v2, 9, ""}, + {_bucket1, _k3, _v3, 3, ""}, + {_bucket1, _k4, _v4, 1, ""}, + } { + b.Put(e.ns, e.k, e.v, "test") + } + ve, ce, err := dedup(nil, b) + r.NoError(err) + r.Equal(8, len(ve)) + r.Zero(len(ce)) + for i, v := range [][]byte{_k4, _k3, _k2, _k1, _v4, _v3, _v2, _v1} { + r.Equal(v, ve[i].Key()) + } + // put a key with diff length into _bucket2 + b.Put(_bucket2, _k1, _v1, "test") + // treat _bucket1 as versioned namespace still OK + ve, ce, err = dedup(map[string]int{_bucket1: 5}, b) + r.NoError(err) + r.Equal(4, len(ve)) + r.Equal(5, len(ce)) + for i, v := range [][]byte{_k4, _k3, _k2, _k1} { + r.Equal(v, ve[i].Key()) + } + for i, v := range [][]byte{_k1, _v4, _v3, _v2, _v1} { + r.Equal(v, ce[i].Key()) + } + // treat _bucket2 (or both buckets) as versioned namespace hits error due to diff key size + _, _, err = dedup(map[string]int{_bucket2: 7}, b) + r.ErrorContains(err, "invalid key length, expecting 7, got 5: invalid input") + _, _, err = dedup(nil, b) + r.ErrorContains(err, "invalid key length, expecting 5, got 7: invalid input") +} + +func TestCommitBatch(t *testing.T) { r := require.New(t) testPath, err := testutil.PathOfTempFile("test-version") r.NoError(err) @@ -248,75 +452,139 @@ func TestMultipleWriteDelete(t *testing.T) { cfg := DefaultConfig cfg.DbPath = testPath - db := NewBoltDBVersioned(cfg) + db := NewBoltDBVersioned(cfg, VnsOption( + Namespace{_bucket1, uint32(len(_k1))}, Namespace{_bucket2, uint32(len(_v1))})) ctx := context.Background() r.NoError(db.Start(ctx)) defer func() { db.Stop(ctx) }() - // multiple writes and deletes - r.NoError(db.Put(1, _bucket1, _k2, _v1)) - r.NoError(db.Put(3, _bucket1, _k2, _v3)) - v, err := db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(3, v) - r.NoError(db.Delete(7, _bucket1, _k2)) - _, err = db.Version(_bucket1, _k2) - r.Equal(ErrDeleted, errors.Cause(err)) - r.NoError(db.Put(10, _bucket1, _k2, _v2)) - v, err = db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(10, v) - r.NoError(db.Delete(15, _bucket1, _k2)) - _, err = db.Version(_bucket1, _k2) - r.Equal(ErrDeleted, errors.Cause(err)) - r.NoError(db.Put(18, _bucket1, _k2, _v3)) - r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write - _, err = db.Version(_bucket1, _k2) - r.Equal(ErrDeleted, errors.Cause(err)) - r.NoError(db.Put(18, _bucket1, _k2, _v3)) // write again - value, err := db.Get(18, _bucket1, _k2) - r.NoError(err) - r.Equal(_v3, value) - v, err = db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(18, v) - r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write - _, err = db.Version(_bucket1, _k2) - r.Equal(ErrDeleted, errors.Cause(err)) - r.NoError(db.Put(21, _bucket1, _k2, _v4)) - v, err = db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(21, v) - r.NoError(db.Delete(25, _bucket1, _k2)) - r.NoError(db.Put(25, _bucket1, _k2, _k2)) // write-after-delete - v, err = db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(25, v) + b := batch.NewBatch() + for _, e := range []versionTest{ + {_bucket2, _v1, _k1, 0, ""}, + {_bucket2, _v2, _k2, 9, ""}, + {_bucket2, _v3, _k3, 3, ""}, + {_bucket1, _k1, _v1, 0, ""}, + {_bucket1, _k2, _v2, 9, ""}, + } { + b.Put(e.ns, e.k, e.v, "test") + } + r.NoError(db.CommitBatch(1, b)) + b.Clear() + for _, e := range []versionTest{ + {_bucket2, _v1, nil, 0, _errNotExist}, + {_bucket2, _v2, nil, 0, _errNotExist}, + {_bucket2, _v3, nil, 0, _errNotExist}, + {_bucket2, _v4, nil, 0, _errNotExist}, + {_bucket2, _v1, _k1, 1, ""}, + {_bucket2, _v2, _k2, 1, ""}, + {_bucket2, _v3, _k3, 1, ""}, + {_bucket2, _v1, _k1, 2, ""}, + {_bucket2, _v2, _k2, 2, ""}, + {_bucket2, _v3, _k3, 2, ""}, + {_bucket2, _v4, nil, 2, _errNotExist}, + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, nil, 0, _errNotExist}, + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 0, _errNotExist}, + {_bucket1, _k1, _v1, 1, ""}, + {_bucket1, _k2, _v2, 1, ""}, + {_bucket1, _k1, _v1, 3, ""}, + {_bucket1, _k2, _v2, 3, ""}, + {_bucket1, _k3, nil, 3, _errNotExist}, + {_bucket1, _k4, nil, 3, _errNotExist}, + } { + value, err := db.Get(e.height, e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } + + // batch with wrong key length would fail + b.Put(_bucket1, _v1, _k1, "test") + r.Equal(ErrInvalid, errors.Cause(db.CommitBatch(3, b))) + b.Clear() + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 0, ""}, + {_bucket1, _k2, _v3, 9, ""}, + {_bucket1, _k3, _v1, 3, ""}, + {_bucket1, _k4, _v2, 1, ""}, + {_bucket2, _v1, _k3, 0, ""}, + {_bucket2, _v2, _k2, 9, ""}, + {_bucket2, _v3, _k1, 3, ""}, + {_bucket2, _v4, _k4, 1, ""}, + } { + b.Put(e.ns, e.k, e.v, "test") + } + b.Delete(_bucket1, _k3, "test") + b.Delete(_bucket2, _v3, "test") + + r.NoError(db.CommitBatch(5, b)) + b.Clear() for _, e := range []versionTest{ - {_bucket1, _k2, nil, 0, ErrNotExist}, - {_bucket1, _k2, _v1, 1, nil}, - {_bucket1, _k2, _v1, 2, nil}, - {_bucket1, _k2, _v3, 3, nil}, - {_bucket1, _k2, _v3, 6, nil}, - {_bucket1, _k2, nil, 7, ErrDeleted}, - {_bucket1, _k2, nil, 9, ErrDeleted}, - {_bucket1, _k2, _v2, 10, nil}, - {_bucket1, _k2, _v2, 14, nil}, - {_bucket1, _k2, nil, 15, ErrDeleted}, - {_bucket1, _k2, nil, 17, ErrDeleted}, - {_bucket1, _k2, nil, 18, ErrDeleted}, - {_bucket1, _k2, nil, 20, ErrDeleted}, - {_bucket1, _k2, _v4, 21, nil}, - {_bucket1, _k2, _v4, 22, nil}, - {_bucket1, _k2, _v4, 24, nil}, - {_bucket1, _k2, _k2, 25, nil}, - {_bucket1, _k2, _k2, 26, nil}, - {_bucket1, _k2, _k2, 25000, nil}, + {_bucket1, _k1, nil, 0, _errNotExist}, + {_bucket1, _k2, nil, 0, _errNotExist}, + {_bucket1, _k3, nil, 0, _errNotExist}, + {_bucket1, _k4, nil, 0, _errNotExist}, + {_bucket2, _v1, nil, 0, _errNotExist}, + {_bucket2, _v2, nil, 0, _errNotExist}, + {_bucket2, _v3, nil, 0, _errNotExist}, + {_bucket2, _v4, nil, 0, _errNotExist}, } { value, err := db.Get(e.height, e.ns, e.k) - r.Equal(e.err, errors.Cause(err)) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } r.Equal(e.v, value) } + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 1, ""}, + {_bucket1, _k2, _v2, 1, ""}, + {_bucket1, _k3, nil, 1, _errNotExist}, + {_bucket1, _k4, nil, 1, _errNotExist}, + {_bucket2, _v1, _k1, 1, ""}, + {_bucket2, _v2, _k2, 1, ""}, + {_bucket2, _v3, _k3, 1, ""}, + {_bucket2, _v4, nil, 1, _errNotExist}, + } { + for _, h := range []uint64{1, 2, 3, 4} { + value, err := db.Get(h, e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } + } + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 5, ""}, + {_bucket1, _k2, _v3, 5, ""}, + {_bucket1, _k3, nil, 5, _errDeleted}, + {_bucket1, _k4, _v2, 5, ""}, + {_bucket2, _v1, _k3, 5, ""}, + {_bucket2, _v2, _k2, 5, ""}, + {_bucket2, _v3, nil, 5, _errDeleted}, + {_bucket2, _v4, _k4, 5, ""}, + } { + for _, h := range []uint64{5, 16, 64, 3000, math.MaxUint64} { + value, err := db.Get(h, e.ns, e.k) + if len(e.err) == 0 { + r.NoError(err) + } else { + r.ErrorContains(err, e.err) + } + r.Equal(e.v, value) + } + } + // cannot write to earlier version + b.Put(_bucket1, _k1, _v2, "test") + b.Put(_bucket1, _k2, _v1, "test") + r.ErrorIs(db.CommitBatch(4, b), ErrInvalid) }