Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UT for disttae.reader #21259

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,24 @@ func (txn *Transaction) ReadOnly() bool {
return txn.readOnly.Load()
}

func (txn *Transaction) genRowIdCol(bat *batch.Batch) (vec *vector.Vector, err error) {
if bat.Vecs[0].GetType().Oid == types.T_Rowid {
panic("RowId column had been generated in batch")
}
txn.genBlock()
len := bat.RowCount()
vec = vector.NewVec(types.T_Rowid.ToType())
for i := 0; i < len; i++ {
if err := vector.AppendFixed(vec, txn.genRowId(), false,
txn.proc.Mp()); err != nil {
return nil, err
}
}
bat.Vecs = append([]*vector.Vector{vec}, bat.Vecs...)
bat.Attrs = append([]string{objectio.PhysicalAddr_Attr}, bat.Attrs...)
return
}

// WriteBatch used to write data to the transaction buffer
// insert/delete/update all use this api
// truncate : it denotes the batch with typ DELETE on mo_tables is generated when Truncating
Expand Down Expand Up @@ -114,20 +132,10 @@ func (txn *Transaction) WriteBatch(
// generate rowid for insert
// TODO(aptend): move this outside WriteBatch? Call twice for the same batch will generate different rowid
if typ == INSERT {
if bat.Vecs[0].GetType().Oid == types.T_Rowid {
panic("rowid should not be generated in Insert WriteBatch")
}
txn.genBlock()
len := bat.RowCount()
genRowidVec = vector.NewVec(types.T_Rowid.ToType())
for i := 0; i < len; i++ {
if err := vector.AppendFixed(genRowidVec, txn.genRowId(), false,
txn.proc.Mp()); err != nil {
return nil, err
}
genRowidVec, err = txn.genRowIdCol(bat)
if err != nil {
return nil, err
}
bat.Vecs = append([]*vector.Vector{genRowidVec}, bat.Vecs...)
bat.Attrs = append([]string{objectio.PhysicalAddr_Attr}, bat.Attrs...)
if tableId != catalog.MO_DATABASE_ID &&
tableId != catalog.MO_TABLES_ID && tableId != catalog.MO_COLUMNS_ID {
txn.approximateInMemInsertSize += uint64(bat.Size())
Expand Down Expand Up @@ -1074,7 +1082,7 @@ func (txn *Transaction) genRowId() types.Rowid {
return types.DecodeFixed[types.Rowid](types.EncodeSlice(txn.rowId[:]))
}

func (txn *Transaction) mergeTxnWorkspaceLocked(ctx context.Context) error {
func (txn *Transaction) MergeTxnWorkspaceLocked(ctx context.Context) error {
txn.restoreTxnTableFunc = txn.restoreTxnTableFunc[:0]

if len(txn.batchSelectList) > 0 {
Expand Down Expand Up @@ -1325,7 +1333,7 @@ func (txn *Transaction) Commit(ctx context.Context) ([]txn.TxnRequest, error) {
return nil, err
}

if err := txn.mergeTxnWorkspaceLocked(ctx); err != nil {
if err := txn.MergeTxnWorkspaceLocked(ctx); err != nil {
return nil, err
}
if err := txn.dumpBatchLocked(ctx, -1); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ func (txn *Transaction) IncrStatementID(ctx context.Context, commit bool) error
txn.Lock()
defer txn.Unlock()
//merge writes for the last statement
if err := txn.mergeTxnWorkspaceLocked(ctx); err != nil {
if err := txn.MergeTxnWorkspaceLocked(ctx); err != nil {
return err
}
// dump batch to s3, starting from 0 (begining of the workspace)
Expand Down
189 changes: 165 additions & 24 deletions pkg/vm/engine/test/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ func Test_ReaderCanReadRangesBlocksWithoutDeletes(t *testing.T) {
}

func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) {
t.Skip("not finished")
var (
err error
mp *mpool.MPool
Expand Down Expand Up @@ -212,14 +211,32 @@ func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) {

rowsCount := 10
bat1 := catalog2.MockBatch(schema, rowsCount)

// write table
// insert rows, and delete ,then read.
{
_, relation, txn, err = disttaeEngine.GetTable(ctx, databaseName, tableName)
require.NoError(t, err)

require.NoError(t, relation.Write(ctx, containers.ToCNBatch(bat1)))

txn.GetWorkspace().UpdateSnapshotWriteOffset()

reader, err := testutil.GetRelationReader(
ctx,
disttaeEngine,
txn,
relation,
nil,
mp,
t,
)
require.NoError(t, err)

ret := testutil.EmptyBatchFromSchema(schema, primaryKeyIdx)
_, err = reader.Read(ctx, ret.Attrs, nil, mp, ret)
require.NoError(t, err)
require.Equal(t, rowsCount, int(ret.RowCount()))
reader.Close()

var bat2 *batch.Batch
txn.GetWorkspace().(*disttae.Transaction).ForEachTableWrites(
relation.GetDBID(ctx), relation.GetTableID(ctx), 1, func(entry disttae.Entry) {
Expand All @@ -228,35 +245,159 @@ func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) {
bat2 = batch.NewWithSize(1)
bat2.Vecs[0] = vector.NewVec(types.T_Rowid.ToType())
require.NoError(t, vector.AppendFixedList[types.Rowid](bat2.Vecs[0], waitedDeletes, nil, mp))
bat2.SetRowCount(len(waitedDeletes))
})

require.NoError(t, relation.Delete(ctx, bat2, catalog.Row_ID))
}

expr := []*plan.Expr{
readutil.MakeFunctionExprForTest("=", []*plan.Expr{
readutil.MakeColExprForTest(int32(primaryKeyIdx), schema.ColDefs[primaryKeyIdx].Type.Oid, schema.ColDefs[primaryKeyIdx].Name),
plan2.MakePlan2Int64ConstExprWithType(bat1.Vecs[primaryKeyIdx].Get(9).(int64)),
}),
//expr := []*plan.Expr{
// readutil.MakeFunctionExprForTest("=", []*plan.Expr{
// readutil.MakeColExprForTest(int32(primaryKeyIdx),
// schema.ColDefs[primaryKeyIdx].Type.Oid),
// plan2.MakePlan2Int64ConstExprWithType(bat1.Vecs[primaryKeyIdx].Get(9).(int64)),
// }),
//}
inVec := bat1.Vecs[primaryKeyIdx].Window(rowsCount/2, 1)
expr := []*plan.Expr{
readutil.ConstructInExpr(ctx, schema.GetPrimaryKey().Name, inVec.GetDownstreamVector()),
}

//Noticed that we should merge the workspace before read.
require.NoError(t, txn.GetWorkspace().(*disttae.Transaction).MergeTxnWorkspaceLocked(ctx))

reader, err = testutil.GetRelationReader(
ctx,
disttaeEngine,
txn,
relation,
expr,
mp,
t,
)
require.NoError(t, err)

ret = testutil.EmptyBatchFromSchema(schema, primaryKeyIdx)
_, err = reader.Read(ctx, ret.Attrs, expr[0], mp, ret)
require.NoError(t, err)

require.Equal(t, 1, int(ret.RowCount()))
reader.Close()
require.NoError(t, txn.Commit(ctx))
}

reader, err := testutil.GetRelationReader(
ctx,
disttaeEngine,
txn,
relation,
expr,
mp,
t,
)
require.NoError(t, err)
//Add UT for issue # 21179, txn can read more than 8192 uncommitted rows.
{
rowsOfBlock := 8192
bat := catalog2.MockBatch(schema, rowsOfBlock*2)
bats := bat.Split(2)

ret := testutil.EmptyBatchFromSchema(schema, primaryKeyIdx)
_, err = reader.Read(ctx, ret.Attrs, expr[0], mp, ret)
require.NoError(t, err)
_, relation, txn, err = disttaeEngine.GetTable(ctx, databaseName, tableName)
require.NoError(t, err)

// read the count of rows in the relation.
reader, err := testutil.GetRelationReader(
ctx,
disttaeEngine,
txn,
relation,
nil,
mp,
t,
)
require.NoError(t, err)

ret := testutil.EmptyBatchFromSchema(schema, primaryKeyIdx)
rows := 0
for {
isEnd, err := reader.Read(ctx, ret.Attrs, nil, mp, ret)
require.NoError(t, err)
if isEnd {
break
}
rows += ret.RowCount()
}
require.NoError(t, err)
reader.Close()

require.NoError(t, relation.Write(ctx, containers.ToCNBatch(bats[0])))

require.NoError(t, relation.Write(ctx, containers.ToCNBatch(bats[1])))

txn.GetWorkspace().UpdateSnapshotWriteOffset()

reader, err = testutil.GetRelationReader(
ctx,
disttaeEngine,
txn,
relation,
nil,
mp,
t,
)
require.NoError(t, err)

ret = testutil.EmptyBatchFromSchema(schema, primaryKeyIdx)
cnt := 0
for {
isEnd, err := reader.Read(ctx, ret.Attrs, nil, mp, ret)
require.NoError(t, err)
if isEnd {
break
}
cnt += ret.RowCount()
}
require.Equal(t, rowsOfBlock*2+rows, cnt)

//delete rows which belongs to the same block.
var bat2 *batch.Batch
var bat3 *batch.Batch
txn.GetWorkspace().(*disttae.Transaction).ForEachTableWrites(
relation.GetDBID(ctx), relation.GetTableID(ctx), 1, func(entry disttae.Entry) {
waitedDeletes := vector.MustFixedColWithTypeCheck[types.Rowid](entry.Bat().GetVector(0))
waitedDeletes1 := waitedDeletes[:rowsOfBlock/2]
bat2 = batch.NewWithSize(1)
bat2.Vecs[0] = vector.NewVec(types.T_Rowid.ToType())
require.NoError(t, vector.AppendFixedList[types.Rowid](bat2.Vecs[0], waitedDeletes1, nil, mp))
bat2.SetRowCount(len(waitedDeletes1))

waitedDeletes2 := waitedDeletes[rowsOfBlock/2 : rowsOfBlock]
bat3 = batch.NewWithSize(1)
bat3.Vecs[0] = vector.NewVec(types.T_Rowid.ToType())
require.NoError(t, vector.AppendFixedList[types.Rowid](bat3.Vecs[0], waitedDeletes2, nil, mp))
bat3.SetRowCount(len(waitedDeletes2))
})

require.NoError(t, relation.Delete(ctx, bat2, catalog.Row_ID))
require.NoError(t, relation.Delete(ctx, bat3, catalog.Row_ID))
//Noticed that we should merge the workspace before read.
require.NoError(t, txn.GetWorkspace().(*disttae.Transaction).MergeTxnWorkspaceLocked(ctx))

reader, err = testutil.GetRelationReader(
ctx,
disttaeEngine,
txn,
relation,
nil,
mp,
t,
)
require.NoError(t, err)

ret = testutil.EmptyBatchFromSchema(schema, primaryKeyIdx)
cnt = 0
for {
isEnd, err := reader.Read(ctx, ret.Attrs, nil, mp, ret)
require.NoError(t, err)
if isEnd {
break
}
cnt += ret.RowCount()
}
require.Equal(t, rowsOfBlock+rows, cnt)
reader.Close()
require.NoError(t, txn.Commit(ctx))
}

require.Equal(t, 1, int(ret.RowCount()))
require.NoError(t, txn.Commit(ctx))
}

func Test_ReaderCanReadCommittedInMemInsertAndDeletes(t *testing.T) {
Expand Down
Loading