Skip to content

Commit

Permalink
Update incremental dedup (#20439)
Browse files Browse the repository at this point in the history
Update incremental dedup

Approved by: @heni02, @XuPeng-SH
  • Loading branch information
jiangxinmeng1 authored Dec 2, 2024
1 parent b96f029 commit 581bd58
Show file tree
Hide file tree
Showing 24 changed files with 221 additions and 217 deletions.
11 changes: 10 additions & 1 deletion pkg/vm/engine/tae/catalog/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,16 @@ func (entry *ObjectEntry) GetUpdateEntry(
dropped.DeleteNode = *txnbase.NewTxnMVCCNodeWithTxn(txn)
return
}

func (entry *ObjectEntry) VisibleByTS(ts types.TS) bool {
// visible by end
if entry.CreatedAt.GT(&ts) {
return false
}
if entry.DeleteBefore(ts) {
return false
}
return true
}
func (entry *ObjectEntry) DeleteBefore(ts types.TS) bool {
deleteTS := entry.GetDeleteAt()
if deleteTS.IsEmpty() {
Expand Down
14 changes: 7 additions & 7 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2308,7 +2308,7 @@ func TestSnapshotIsolation1(t *testing.T) {
// Step 4
err = rel1.UpdateByFilter(context.Background(), filter, 3, int64(1111), false)
t.Log(err)
assert.True(t, moerr.IsMoErrCode(err, moerr.ErrTxnWWConflict))
assert.True(t, moerr.IsMoErrCode(err, moerr.ErrNotFound))
_ = txn1.Rollback(context.Background())

// Step 5
Expand Down Expand Up @@ -3467,10 +3467,10 @@ func TestImmutableIndexInAblk(t *testing.T) {
rowIDs.Append(nil, true)
}
err = meta.GetObjectData().GetDuplicatedRows(
context.Background(), txn, bat.Vecs[1], nil, false, true, false, rowIDs, common.DefaultAllocator,
context.Background(), txn, bat.Vecs[1], nil, types.TS{}, types.MaxTs(), rowIDs, common.DefaultAllocator,
)
assert.NoError(t, err)
err = meta.GetObjectData().Contains(context.Background(), txn, false, rowIDs, nil, common.DebugAllocator)
err = meta.GetObjectData().Contains(context.Background(), txn, rowIDs, nil, common.DebugAllocator)
assert.NoError(t, err)
duplicate := false
rowIDs.Foreach(func(v any, isNull bool, row int) error {
Expand Down Expand Up @@ -7983,8 +7983,6 @@ func TestSnapshotLag1(t *testing.T) {
bats := data.Split(4)
tae.CreateRelAndAppend(bats[0], true)

txn1, rel1 := tae.GetRelation()
assert.NoError(t, rel1.Append(context.Background(), bats[1]))
txn2, rel2 := tae.GetRelation()
assert.NoError(t, rel2.Append(context.Background(), bats[1]))

Expand All @@ -7994,9 +7992,11 @@ func TestSnapshotLag1(t *testing.T) {
assert.NoError(t, txn.Commit(context.Background()))
}

txn1.MockStartTS(tae.TxnMgr.Now())
err := txn1.Commit(context.Background())
txn1, rel1 := tae.GetRelation()
err := rel1.Append(context.Background(), bats[1])
assert.True(t, moerr.IsMoErrCode(err, moerr.ErrDuplicateEntry))
err = txn1.Commit(context.Background())
assert.NoError(t, err)
err = txn2.Commit(context.Background())
assert.True(t, moerr.IsMoErrCode(err, moerr.ErrTxnWWConflict))
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/vm/engine/tae/iface/data/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,10 @@ type Object interface {
txn txnif.TxnReader,
keys containers.Vector,
keysZM index.ZM,
precommit bool,
checkWWConflict bool,
skipCommittedBeforeTxnForAblk bool,
from, to types.TS,
rowIDs containers.Vector,
mp *mpool.MPool,
) (err error)
GetMaxRowByTS(ts types.TS) (uint32, error)
GetValue(ctx context.Context, txn txnif.AsyncTxn, readSchema any, blkID uint16, row, col int, skipCheckDelete bool, mp *mpool.MPool) (any, bool, error)
PPString(level common.PPLevel, depth int, prefix string, blkid int) string
EstimateMemSize() (int, int)
Expand All @@ -107,7 +104,6 @@ type Object interface {
Contains(
ctx context.Context,
txn txnif.TxnReader,
isCommitting bool,
keys containers.Vector,
keysZM index.ZM,
mp *mpool.MPool) (err error)
Expand Down
15 changes: 9 additions & 6 deletions pkg/vm/engine/tae/index/indexwrapper/mutindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,8 @@ func (idx *MutIndex) GetDuplicatedRows(
keysZM index.ZM,
blkID *types.Blockid,
rowIDs *vector.Vector,
maxVisibleRow uint32,
getRowOffsetFn func() (min, max int32, err error),
skipFn func(row uint32) error,
skipCommittedBeforeTxnForAblk bool,
mp *mpool.MPool,
) (err error) {
if keysZM.Valid() {
Expand All @@ -160,6 +159,10 @@ func (idx *MutIndex) GetDuplicatedRows(
return
}
}
minVisibleRow, maxVisibleRow, err := getRowOffsetFn()
if err != nil {
return
}
op := func(v []byte, _ bool, offset int) error {
if !rowIDs.IsNull(uint64(offset)) {
return nil
Expand All @@ -174,13 +177,13 @@ func (idx *MutIndex) GetDuplicatedRows(
return err
}
}
if skipCommittedBeforeTxnForAblk {
return nil
}
var maxRow uint32
exist := false
for i := len(rows) - 1; i >= 0; i-- {
if rows[i] < maxVisibleRow {
if int32(rows[i]) <= minVisibleRow {
break
}
if int32(rows[i]) < maxVisibleRow {
maxRow = rows[i]
exist = true
break
Expand Down
47 changes: 27 additions & 20 deletions pkg/vm/engine/tae/tables/aobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ func (obj *aobject) GetDuplicatedRows(
txn txnif.TxnReader,
keys containers.Vector,
keysZM index.ZM,
precommit bool,
checkWWConflict bool,
skipCommittedBeforeTxnForAblk bool,
from, to types.TS,
rowIDs containers.Vector,
mp *mpool.MPool,
) (err error) {
Expand All @@ -211,45 +209,57 @@ func (obj *aobject) GetDuplicatedRows(
}()
node := obj.PinNode()
defer node.Unref()
maxRow := uint32(math.MaxUint32)
if !precommit {
maxRow, err = obj.GetMaxRowByTS(txn.GetStartTS())
}
if !node.IsPersisted() {
fn := func() (min, max int32, err error) {
obj.RUnlock()
defer obj.RLock()
max, err = obj.GetMaxRowByTS(to)
if err != nil {
return
}
min, err = obj.GetMaxRowByTS(from)
if err != nil {
return
}
return
}
return node.GetDuplicatedRows(
ctx,
txn,
maxRow,
fn,
keys,
keysZM,
rowIDs,
precommit,
checkWWConflict,
skipCommittedBeforeTxnForAblk,
mp,
)
} else {
return obj.persistedGetDuplicatedRows(
ctx,
txn,
skipCommittedBeforeTxnForAblk,
from, to,
keys,
keysZM,
rowIDs,
true,
maxRow,
mp,
)
}
}

func (obj *aobject) GetMaxRowByTS(ts types.TS) (uint32, error) {
func (obj *aobject) GetMaxRowByTS(ts types.TS) (int32, error) {
if ts.IsEmpty() {
return -1, nil
}
maxTS := types.MaxTs()
if ts.EQ(&maxTS) {
return math.MaxInt32, nil
}
node := obj.PinNode()
defer node.Unref()
if !node.IsPersisted() {
obj.RLock()
defer obj.RUnlock()
return obj.appendMVCC.GetMaxRowByTSLocked(ts), nil
return int32(obj.appendMVCC.GetMaxRowByTSLocked(ts)), nil
} else {
vec, err := obj.LoadPersistedCommitTS(0)
if err != nil {
Expand All @@ -260,16 +270,15 @@ func (obj *aobject) GetMaxRowByTS(ts types.TS) (uint32, error) {
vec.GetDownstreamVector())
for i := range tsVec {
if tsVec[i].GT(&ts) {
return uint32(i), nil
return int32(i), nil
}
}
return uint32(vec.Length()), nil
return int32(vec.Length()), nil
}
}
func (obj *aobject) Contains(
ctx context.Context,
txn txnif.TxnReader,
precommit bool,
keys containers.Vector,
keysZM index.ZM,
mp *mpool.MPool,
Expand All @@ -287,14 +296,12 @@ func (obj *aobject) Contains(
keys,
keysZM,
txn,
precommit,
mp,
)
} else {
return obj.persistedContains(
ctx,
txn,
precommit,
keys,
keysZM,
true,
Expand Down
17 changes: 6 additions & 11 deletions pkg/vm/engine/tae/tables/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ func (obj *baseObject) getDuplicateRowsWithLoad(
rowIDs containers.Vector,
blkOffset uint16,
isAblk bool,
skipCommittedBeforeTxnForAblk bool,
maxVisibleRow uint32,
from, to types.TS,
mp *mpool.MPool,
) (err error) {
schema := obj.meta.Load().GetSchema()
Expand All @@ -272,13 +271,12 @@ func (obj *baseObject) getDuplicateRowsWithLoad(
dedupFn = containers.MakeForeachVectorOp(
keys.GetType().Oid,
getRowIDAlkFunctions,
data.Vecs[0],
data.Vecs[0], //
rowIDs,
blkID,
maxVisibleRow,
obj.LoadPersistedCommitTS,
txn,
skipCommittedBeforeTxnForAblk,
from, to,
)
} else {
dedupFn = containers.MakeForeachVectorOp(
Expand Down Expand Up @@ -307,7 +305,6 @@ func (obj *baseObject) containsWithLoad(
sels *nulls.Bitmap,
blkOffset uint16,
isAblk bool,
isCommitting bool,
mp *mpool.MPool,
) (err error) {
schema := obj.meta.Load().GetSchema()
Expand Down Expand Up @@ -380,12 +377,11 @@ func (obj *baseObject) containsWithLoad(
func (obj *baseObject) persistedGetDuplicatedRows(
ctx context.Context,
txn txnif.TxnReader,
skipCommittedBeforeTxnForAblk bool,
from, to types.TS,
keys containers.Vector,
keysZM index.ZM,
rowIDs containers.Vector,
isAblk bool,
maxVisibleRow uint32,
mp *mpool.MPool,
) (err error) {
pkIndex, err := MakeImmuIndex(
Expand All @@ -410,7 +406,7 @@ func (obj *baseObject) persistedGetDuplicatedRows(
continue
}
err = obj.getDuplicateRowsWithLoad(
ctx, txn, keys, sels, rowIDs, uint16(i), isAblk, skipCommittedBeforeTxnForAblk, maxVisibleRow, mp)
ctx, txn, keys, sels, rowIDs, uint16(i), isAblk, from, to, mp)
if err != nil {
return err
}
Expand All @@ -421,7 +417,6 @@ func (obj *baseObject) persistedGetDuplicatedRows(
func (obj *baseObject) persistedContains(
ctx context.Context,
txn txnif.TxnReader,
isCommitting bool,
keys containers.Vector,
keysZM index.ZM,
isAblk bool,
Expand Down Expand Up @@ -449,7 +444,7 @@ func (obj *baseObject) persistedContains(
continue
}
err = obj.containsWithLoad(
ctx, txn, keys, sels, uint16(i), isAblk, isCommitting, mp)
ctx, txn, keys, sels, uint16(i), isAblk, mp)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 581bd58

Please sign in to comment.