Skip to content

Commit

Permalink
add AddVersionedNamespace() method
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Dec 20, 2024
1 parent 44f780c commit 71f01e3
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 31 deletions.
63 changes: 35 additions & 28 deletions db/db_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type (
// Base returns the underlying KVStore
Base() KVStore

// AddVersionedNamespace adds a versioned namespace
AddVersionedNamespace(string, int) error

// Version returns the key's most recent version
Version(string, []byte) (uint64, error)
}
Expand Down Expand Up @@ -77,6 +80,24 @@ func (b *BoltDBVersioned) Base() KVStore {
return b.db
}

// AddVersionedNamespace adds a versioned namespace
func (b *BoltDBVersioned) AddVersionedNamespace(ns string, keyLen uint32) error {
vn, err := b.checkNamespace(ns)
if cause := errors.Cause(err); cause == ErrNotExist || cause == ErrBucketNotExist {
// create metadata for namespace
return b.db.Put(ns, _minKey, (&versionedNamespace{
keyLen: keyLen,
}).serialize())
}
if err != nil {
return err
}
if vn.keyLen != keyLen {
return errors.Wrapf(ErrInvalid, "namespace %s already exists with key length = %d, got %d", ns, vn.keyLen, keyLen)
}
return nil
}

// Put writes a <key, value> record
func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) error {
if !b.db.IsReady() {
Expand All @@ -88,22 +109,15 @@ func (b *BoltDBVersioned) Put(version uint64, ns string, key, value []byte) erro
return err
}
buf := batch.NewBatch()
if vn == nil {
// namespace not yet created
buf.Put(ns, _minKey, (&versionedNamespace{
keyLen: uint32(len(key)),
}).serialize(), "failed to create metadata")
} else {
if len(key) != int(vn.keyLen) {
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key))
}
last, _, err := b.get(math.MaxUint64, ns, key)
if !isNotExist(err) && version < last {
// not allowed to perform write on an earlier version
return ErrInvalid
}
buf.Delete(ns, keyForDelete(key, version), fmt.Sprintf("failed to delete key %x", key))
if len(key) != int(vn.keyLen) {
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key))
}
last, _, err := b.get(math.MaxUint64, ns, key)
if !isNotExist(err) && version < last {
// not allowed to perform write on an earlier version
return ErrInvalid
}
buf.Delete(ns, keyForDelete(key, version), fmt.Sprintf("failed to delete key %x", key))
buf.Put(ns, keyForWrite(key, version), value, fmt.Sprintf("failed to put key %x", key))
return b.db.WriteBatch(buf)
}
Expand Down Expand Up @@ -455,28 +469,21 @@ func parseKey(key []byte) (bool, uint64) {

func (b *BoltDBVersioned) checkNamespace(ns string) (*versionedNamespace, error) {
data, err := b.db.Get(ns, _minKey)
switch errors.Cause(err) {
case nil:
vn, err := deserializeVersionedNamespace(data)
if err != nil {
return nil, err
}
return vn, nil
case ErrNotExist, ErrBucketNotExist:
return nil, nil
default:
if err != nil {
return nil, err
}
vn, err := deserializeVersionedNamespace(data)
if err != nil {
return nil, err
}
return vn, nil
}

func (b *BoltDBVersioned) checkNamespaceAndKey(ns string, key []byte) error {
vn, err := b.checkNamespace(ns)
if err != nil {
return err
}
if vn == nil {
return errors.Wrapf(ErrNotExist, "namespace = %x doesn't exist", ns)
}
if len(key) != int(vn.keyLen) {
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, len(key))
}
Expand Down
14 changes: 11 additions & 3 deletions db/db_versioned_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,18 @@ func TestVersionedDB(t *testing.T) {
// namespace and key does not exist
vn, err := db.checkNamespace(_bucket1)
r.Nil(vn)
r.Nil(err)
// write first key, namespace and key now exist
r.ErrorContains(err, ErrNotExist.Error())
// create namespace
r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k2))))
r.ErrorContains(db.AddVersionedNamespace(_bucket1, 8), "namespace test_ns1 already exists with key length = 5, got 8")
// write first key
r.NoError(db.Put(0, _bucket1, _k2, _v2))
vn, err = db.checkNamespace(_bucket1)
r.NoError(err)
r.EqualValues(len(_k2), vn.keyLen)
// check more Put/Get
err = db.Put(1, _bucket1, _k10, _v1)
r.Equal("invalid key length, expecting 5, got 6: invalid input", err.Error())
r.ErrorContains(err, "invalid key length, expecting 5, got 6: invalid input")
r.NoError(db.Put(1, _bucket1, _k2, _v1))
r.NoError(db.Put(3, _bucket1, _k2, _v3))
r.NoError(db.Put(6, _bucket1, _k2, _v2))
Expand Down Expand Up @@ -256,6 +259,8 @@ func TestMultipleWriteDelete(t *testing.T) {
ctx := context.Background()
r.NoError(db.Start(ctx))

// create namespace
r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k2))))
if i == 0 {
// multiple writes and deletes
r.NoError(db.Put(1, _bucket1, _k2, _v1))
Expand Down Expand Up @@ -436,6 +441,9 @@ func TestCommitToDB(t *testing.T) {
b.Put(e.ns, e.k, e.v, "test")
}

// create namespace
r.NoError(db.AddVersionedNamespace(_bucket1, uint32(len(_k1))))
r.NoError(db.AddVersionedNamespace(_bucket2, uint32(len(_v1))))
r.NoError(db.CommitToDB(1, nil, b))
b.Clear()
for _, e := range []versionTest{
Expand Down

0 comments on commit 71f01e3

Please sign in to comment.