From 14f30a02fa22f350a22822212d673c70556128b1 Mon Sep 17 00:00:00 2001 From: triump2020 Date: Thu, 16 Jan 2025 21:46:21 +0800 Subject: [PATCH 1/2] update --- pkg/vm/engine/disttae/txn.go | 38 ++++--- pkg/vm/engine/disttae/types.go | 2 +- pkg/vm/engine/test/reader_test.go | 158 +++++++++++++++++++++++++----- 3 files changed, 158 insertions(+), 40 deletions(-) diff --git a/pkg/vm/engine/disttae/txn.go b/pkg/vm/engine/disttae/txn.go index 7c369c3ae0660..0b25f1d473cd1 100644 --- a/pkg/vm/engine/disttae/txn.go +++ b/pkg/vm/engine/disttae/txn.go @@ -73,6 +73,24 @@ func (txn *Transaction) ReadOnly() bool { return txn.readOnly.Load() } +func (txn *Transaction) genRowIdCol(bat *batch.Batch) (vec *vector.Vector, err error) { + if bat.Vecs[0].GetType().Oid == types.T_Rowid { + panic("RowId column had been generated in batch") + } + txn.genBlock() + len := bat.RowCount() + vec = vector.NewVec(types.T_Rowid.ToType()) + for i := 0; i < len; i++ { + if err := vector.AppendFixed(vec, txn.genRowId(), false, + txn.proc.Mp()); err != nil { + return nil, err + } + } + bat.Vecs = append([]*vector.Vector{vec}, bat.Vecs...) + bat.Attrs = append([]string{objectio.PhysicalAddr_Attr}, bat.Attrs...) + return +} + // WriteBatch used to write data to the transaction buffer // insert/delete/update all use this api // truncate : it denotes the batch with typ DELETE on mo_tables is generated when Truncating @@ -114,20 +132,10 @@ func (txn *Transaction) WriteBatch( // generate rowid for insert // TODO(aptend): move this outside WriteBatch? Call twice for the same batch will generate different rowid if typ == INSERT { - if bat.Vecs[0].GetType().Oid == types.T_Rowid { - panic("rowid should not be generated in Insert WriteBatch") - } - txn.genBlock() - len := bat.RowCount() - genRowidVec = vector.NewVec(types.T_Rowid.ToType()) - for i := 0; i < len; i++ { - if err := vector.AppendFixed(genRowidVec, txn.genRowId(), false, - txn.proc.Mp()); err != nil { - return nil, err - } + genRowidVec, err = txn.genRowIdCol(bat) + if err != nil { + return nil, err } - bat.Vecs = append([]*vector.Vector{genRowidVec}, bat.Vecs...) - bat.Attrs = append([]string{objectio.PhysicalAddr_Attr}, bat.Attrs...) if tableId != catalog.MO_DATABASE_ID && tableId != catalog.MO_TABLES_ID && tableId != catalog.MO_COLUMNS_ID { txn.approximateInMemInsertSize += uint64(bat.Size()) @@ -1074,7 +1082,7 @@ func (txn *Transaction) genRowId() types.Rowid { return types.DecodeFixed[types.Rowid](types.EncodeSlice(txn.rowId[:])) } -func (txn *Transaction) mergeTxnWorkspaceLocked(ctx context.Context) error { +func (txn *Transaction) MergeTxnWorkspaceLocked(ctx context.Context) error { txn.restoreTxnTableFunc = txn.restoreTxnTableFunc[:0] if len(txn.batchSelectList) > 0 { @@ -1325,7 +1333,7 @@ func (txn *Transaction) Commit(ctx context.Context) ([]txn.TxnRequest, error) { return nil, err } - if err := txn.mergeTxnWorkspaceLocked(ctx); err != nil { + if err := txn.MergeTxnWorkspaceLocked(ctx); err != nil { return nil, err } if err := txn.dumpBatchLocked(ctx, -1); err != nil { diff --git a/pkg/vm/engine/disttae/types.go b/pkg/vm/engine/disttae/types.go index 862582fdd345c..ac0895cd4afc1 100644 --- a/pkg/vm/engine/disttae/types.go +++ b/pkg/vm/engine/disttae/types.go @@ -558,7 +558,7 @@ func (txn *Transaction) IncrStatementID(ctx context.Context, commit bool) error txn.Lock() defer txn.Unlock() //merge writes for the last statement - if err := txn.mergeTxnWorkspaceLocked(ctx); err != nil { + if err := txn.MergeTxnWorkspaceLocked(ctx); err != nil { return err } // dump batch to s3, starting from 0 (begining of the workspace) diff --git a/pkg/vm/engine/test/reader_test.go b/pkg/vm/engine/test/reader_test.go index 37eeebf8b18b5..080b5c96bd716 100644 --- a/pkg/vm/engine/test/reader_test.go +++ b/pkg/vm/engine/test/reader_test.go @@ -172,7 +172,6 @@ func Test_ReaderCanReadRangesBlocksWithoutDeletes(t *testing.T) { } func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) { - t.Skip("not finished") var ( err error mp *mpool.MPool @@ -212,14 +211,32 @@ func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) { rowsCount := 10 bat1 := catalog2.MockBatch(schema, rowsCount) - - // write table + // insert rows, and delete ,then read. { _, relation, txn, err = disttaeEngine.GetTable(ctx, databaseName, tableName) require.NoError(t, err) require.NoError(t, relation.Write(ctx, containers.ToCNBatch(bat1))) + txn.GetWorkspace().UpdateSnapshotWriteOffset() + + reader, err := testutil.GetRelationReader( + ctx, + disttaeEngine, + txn, + relation, + nil, + mp, + t, + ) + require.NoError(t, err) + + ret := testutil.EmptyBatchFromSchema(schema, primaryKeyIdx) + _, err = reader.Read(ctx, ret.Attrs, nil, mp, ret) + require.NoError(t, err) + require.Equal(t, rowsCount, int(ret.RowCount())) + reader.Close() + var bat2 *batch.Batch txn.GetWorkspace().(*disttae.Transaction).ForEachTableWrites( relation.GetDBID(ctx), relation.GetTableID(ctx), 1, func(entry disttae.Entry) { @@ -228,35 +245,128 @@ func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) { bat2 = batch.NewWithSize(1) bat2.Vecs[0] = vector.NewVec(types.T_Rowid.ToType()) require.NoError(t, vector.AppendFixedList[types.Rowid](bat2.Vecs[0], waitedDeletes, nil, mp)) + bat2.SetRowCount(len(waitedDeletes)) }) require.NoError(t, relation.Delete(ctx, bat2, catalog.Row_ID)) - } - expr := []*plan.Expr{ - readutil.MakeFunctionExprForTest("=", []*plan.Expr{ - readutil.MakeColExprForTest(int32(primaryKeyIdx), schema.ColDefs[primaryKeyIdx].Type.Oid, schema.ColDefs[primaryKeyIdx].Name), - plan2.MakePlan2Int64ConstExprWithType(bat1.Vecs[primaryKeyIdx].Get(9).(int64)), - }), + expr := []*plan.Expr{ + readutil.MakeFunctionExprForTest("=", []*plan.Expr{ + readutil.MakeColExprForTest(int32(primaryKeyIdx), + schema.ColDefs[primaryKeyIdx].Type.Oid, schema.ColDefs[primaryKeyIdx].Name), + plan2.MakePlan2Int64ConstExprWithType(bat1.Vecs[primaryKeyIdx].Get(9).(int64)), + }), + } + + //Noticed that we should merge the workspace before read. + require.NoError(t, txn.GetWorkspace().(*disttae.Transaction).MergeTxnWorkspaceLocked(ctx)) + + reader, err = testutil.GetRelationReader( + ctx, + disttaeEngine, + txn, + relation, + expr, + mp, + t, + ) + require.NoError(t, err) + + ret = testutil.EmptyBatchFromSchema(schema, primaryKeyIdx) + _, err = reader.Read(ctx, ret.Attrs, expr[0], mp, ret) + require.NoError(t, err) + + require.Equal(t, 1, int(ret.RowCount())) + reader.Close() + require.NoError(t, txn.Commit(ctx)) } - reader, err := testutil.GetRelationReader( - ctx, - disttaeEngine, - txn, - relation, - expr, - mp, - t, - ) - require.NoError(t, err) + //Add UT for issue # 21179, txn can read more than 8192 uncommitted rows. + { + rowsOfBlock := 8192 + bat := catalog2.MockBatch(schema, rowsOfBlock*2) + bats := bat.Split(2) - ret := testutil.EmptyBatchFromSchema(schema, primaryKeyIdx) - _, err = reader.Read(ctx, ret.Attrs, expr[0], mp, ret) - require.NoError(t, err) + _, relation, txn, err = disttaeEngine.GetTable(ctx, databaseName, tableName) + require.NoError(t, err) + + require.NoError(t, relation.Write(ctx, containers.ToCNBatch(bats[0]))) + + require.NoError(t, relation.Write(ctx, containers.ToCNBatch(bats[1]))) + + txn.GetWorkspace().UpdateSnapshotWriteOffset() + + reader, err := testutil.GetRelationReader( + ctx, + disttaeEngine, + txn, + relation, + nil, + mp, + t, + ) + require.NoError(t, err) + + ret := testutil.EmptyBatchFromSchema(schema, primaryKeyIdx) + cnt := 0 + for { + isEnd, err := reader.Read(ctx, ret.Attrs, nil, mp, ret) + require.NoError(t, err) + if isEnd { + break + } + cnt += ret.RowCount() + } + require.Equal(t, rowsOfBlock*2, cnt) + + //delete rows which belongs to the same block. + var bat2 *batch.Batch + var bat3 *batch.Batch + txn.GetWorkspace().(*disttae.Transaction).ForEachTableWrites( + relation.GetDBID(ctx), relation.GetTableID(ctx), 1, func(entry disttae.Entry) { + waitedDeletes := vector.MustFixedColWithTypeCheck[types.Rowid](entry.Bat().GetVector(0)) + waitedDeletes1 := waitedDeletes[:rowsOfBlock/2] + bat2 = batch.NewWithSize(1) + bat2.Vecs[0] = vector.NewVec(types.T_Rowid.ToType()) + require.NoError(t, vector.AppendFixedList[types.Rowid](bat2.Vecs[0], waitedDeletes1, nil, mp)) + + waitedDeletes2 := waitedDeletes[rowsOfBlock/2 : rowsOfBlock] + bat3 = batch.NewWithSize(1) + bat3.Vecs[0] = vector.NewVec(types.T_Rowid.ToType()) + require.NoError(t, vector.AppendFixedList[types.Rowid](bat2.Vecs[0], waitedDeletes2, nil, mp)) + + }) + + require.NoError(t, relation.Delete(ctx, bat2, catalog.Row_ID)) + //Noticed that we should merge the workspace before read. + require.NoError(t, txn.GetWorkspace().(*disttae.Transaction).MergeTxnWorkspaceLocked(ctx)) + + reader, err = testutil.GetRelationReader( + ctx, + disttaeEngine, + txn, + relation, + nil, + mp, + t, + ) + require.NoError(t, err) + + ret = testutil.EmptyBatchFromSchema(schema, primaryKeyIdx) + cnt = 0 + for { + isEnd, err := reader.Read(ctx, ret.Attrs, nil, mp, ret) + require.NoError(t, err) + if isEnd { + break + } + cnt += ret.RowCount() + } + require.Equal(t, rowsOfBlock, cnt) + reader.Close() + + } - require.Equal(t, 1, int(ret.RowCount())) - require.NoError(t, txn.Commit(ctx)) } func Test_ReaderCanReadCommittedInMemInsertAndDeletes(t *testing.T) { From 7719e8a622917226f7b7898148ab16b7497e0f28 Mon Sep 17 00:00:00 2001 From: triump2020 Date: Fri, 17 Jan 2025 00:06:18 +0800 Subject: [PATCH 2/2] update --- pkg/vm/engine/test/reader_test.go | 55 ++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/pkg/vm/engine/test/reader_test.go b/pkg/vm/engine/test/reader_test.go index 080b5c96bd716..49c2b38e4c66c 100644 --- a/pkg/vm/engine/test/reader_test.go +++ b/pkg/vm/engine/test/reader_test.go @@ -250,12 +250,16 @@ func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) { require.NoError(t, relation.Delete(ctx, bat2, catalog.Row_ID)) + //expr := []*plan.Expr{ + // readutil.MakeFunctionExprForTest("=", []*plan.Expr{ + // readutil.MakeColExprForTest(int32(primaryKeyIdx), + // schema.ColDefs[primaryKeyIdx].Type.Oid), + // plan2.MakePlan2Int64ConstExprWithType(bat1.Vecs[primaryKeyIdx].Get(9).(int64)), + // }), + //} + inVec := bat1.Vecs[primaryKeyIdx].Window(rowsCount/2, 1) expr := []*plan.Expr{ - readutil.MakeFunctionExprForTest("=", []*plan.Expr{ - readutil.MakeColExprForTest(int32(primaryKeyIdx), - schema.ColDefs[primaryKeyIdx].Type.Oid, schema.ColDefs[primaryKeyIdx].Name), - plan2.MakePlan2Int64ConstExprWithType(bat1.Vecs[primaryKeyIdx].Get(9).(int64)), - }), + readutil.ConstructInExpr(ctx, schema.GetPrimaryKey().Name, inVec.GetDownstreamVector()), } //Noticed that we should merge the workspace before read. @@ -290,13 +294,38 @@ func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) { _, relation, txn, err = disttaeEngine.GetTable(ctx, databaseName, tableName) require.NoError(t, err) + // read the count of rows in the relation. + reader, err := testutil.GetRelationReader( + ctx, + disttaeEngine, + txn, + relation, + nil, + mp, + t, + ) + require.NoError(t, err) + + ret := testutil.EmptyBatchFromSchema(schema, primaryKeyIdx) + rows := 0 + for { + isEnd, err := reader.Read(ctx, ret.Attrs, nil, mp, ret) + require.NoError(t, err) + if isEnd { + break + } + rows += ret.RowCount() + } + require.NoError(t, err) + reader.Close() + require.NoError(t, relation.Write(ctx, containers.ToCNBatch(bats[0]))) require.NoError(t, relation.Write(ctx, containers.ToCNBatch(bats[1]))) txn.GetWorkspace().UpdateSnapshotWriteOffset() - reader, err := testutil.GetRelationReader( + reader, err = testutil.GetRelationReader( ctx, disttaeEngine, txn, @@ -307,7 +336,7 @@ func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) { ) require.NoError(t, err) - ret := testutil.EmptyBatchFromSchema(schema, primaryKeyIdx) + ret = testutil.EmptyBatchFromSchema(schema, primaryKeyIdx) cnt := 0 for { isEnd, err := reader.Read(ctx, ret.Attrs, nil, mp, ret) @@ -317,7 +346,7 @@ func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) { } cnt += ret.RowCount() } - require.Equal(t, rowsOfBlock*2, cnt) + require.Equal(t, rowsOfBlock*2+rows, cnt) //delete rows which belongs to the same block. var bat2 *batch.Batch @@ -329,15 +358,17 @@ func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) { bat2 = batch.NewWithSize(1) bat2.Vecs[0] = vector.NewVec(types.T_Rowid.ToType()) require.NoError(t, vector.AppendFixedList[types.Rowid](bat2.Vecs[0], waitedDeletes1, nil, mp)) + bat2.SetRowCount(len(waitedDeletes1)) waitedDeletes2 := waitedDeletes[rowsOfBlock/2 : rowsOfBlock] bat3 = batch.NewWithSize(1) bat3.Vecs[0] = vector.NewVec(types.T_Rowid.ToType()) - require.NoError(t, vector.AppendFixedList[types.Rowid](bat2.Vecs[0], waitedDeletes2, nil, mp)) - + require.NoError(t, vector.AppendFixedList[types.Rowid](bat3.Vecs[0], waitedDeletes2, nil, mp)) + bat3.SetRowCount(len(waitedDeletes2)) }) require.NoError(t, relation.Delete(ctx, bat2, catalog.Row_ID)) + require.NoError(t, relation.Delete(ctx, bat3, catalog.Row_ID)) //Noticed that we should merge the workspace before read. require.NoError(t, txn.GetWorkspace().(*disttae.Transaction).MergeTxnWorkspaceLocked(ctx)) @@ -362,9 +393,9 @@ func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) { } cnt += ret.RowCount() } - require.Equal(t, rowsOfBlock, cnt) + require.Equal(t, rowsOfBlock+rows, cnt) reader.Close() - + require.NoError(t, txn.Commit(ctx)) } }