diff --git a/pkg/vm/engine/tae/catalog/basemvccnode.go b/pkg/vm/engine/tae/catalog/basemvccnode.go index 812b44134bbad..ec9eb7608f33a 100644 --- a/pkg/vm/engine/tae/catalog/basemvccnode.go +++ b/pkg/vm/engine/tae/catalog/basemvccnode.go @@ -179,10 +179,10 @@ func (un EntryMVCCNode) AppendTupleWithCommitTS(bat *containers.Batch, ts types. ) } -func ReadEntryNodeTuple(bat *containers.Batch, row int) (un *EntryMVCCNode) { +func ReadEntryNodeTuple(createAt, deleteAt types.TS) (un *EntryMVCCNode) { un = &EntryMVCCNode{ - CreatedAt: bat.GetVectorByName(EntryNode_CreateAt).Get(row).(types.TS), - DeletedAt: bat.GetVectorByName(EntryNode_DeleteAt).Get(row).(types.TS), + CreatedAt: createAt, + DeletedAt: deleteAt, } return } diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index 95bf7aed87b45..3d18f37af297d 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -20,6 +20,7 @@ import ( pkgcatalog "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/util/fault" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" @@ -33,6 +34,10 @@ const ( Backup_Object_Offset uint16 = 1000 ) +type ObjectListReplayer interface { + Submit(uint64, func()) +} + //#region Replay WAL related func (catalog *Catalog) ReplayCmd( @@ -421,27 +426,35 @@ func (catalog *Catalog) onReplayCreateTable(dbid, tid uint64, schema *Schema, tx tbl.InsertLocked(un) } -func (catalog *Catalog) OnReplayObjectBatch(objectInfo *containers.Batch, isTombstone bool, dataFactory DataFactory, forSys bool) { - for i := 0; i < objectInfo.Length(); i++ { - tid := objectInfo.GetVectorByName(SnapshotAttr_TID).Get(i).(uint64) +func (catalog *Catalog) OnReplayObjectBatch(replayer ObjectListReplayer, objectInfo *containers.Batch, isTombstone bool, dataFactory DataFactory, forSys bool) { + tids := vector.MustFixedColNoTypeCheck[uint64](objectInfo.GetVectorByName(SnapshotAttr_TID).GetDownstreamVector()) + dbids := vector.MustFixedColNoTypeCheck[uint64](objectInfo.GetVectorByName(SnapshotAttr_DBID).GetDownstreamVector()) + commitTSs := vector.MustFixedColNoTypeCheck[types.TS](objectInfo.GetVectorByName(txnbase.SnapshotAttr_CommitTS).GetDownstreamVector()) + prepareTSs := vector.MustFixedColNoTypeCheck[types.TS](objectInfo.GetVectorByName(txnbase.SnapshotAttr_PrepareTS).GetDownstreamVector()) + startTSs := vector.MustFixedColNoTypeCheck[types.TS](objectInfo.GetVectorByName(txnbase.SnapshotAttr_StartTS).GetDownstreamVector()) + createTSs := vector.MustFixedColNoTypeCheck[types.TS](objectInfo.GetVectorByName(EntryNode_CreateAt).GetDownstreamVector()) + deleteTSs := vector.MustFixedColNoTypeCheck[types.TS](objectInfo.GetVectorByName(EntryNode_DeleteAt).GetDownstreamVector()) + for i, tid := range tids { if forSys != pkgcatalog.IsSystemTable(tid) { continue } - dbid := objectInfo.GetVectorByName(SnapshotAttr_DBID).Get(i).(uint64) - objectNode := ReadObjectInfoTuple(objectInfo, i) - sid := objectNode.ObjectName().ObjectId() - txnNode := txnbase.ReadTuple(objectInfo, i) - entryNode := ReadEntryNodeTuple(objectInfo, i) - catalog.onReplayCheckpointObject(dbid, tid, sid, objectNode, entryNode, txnNode, isTombstone, dataFactory) + replayFn := func() { + dbid := dbids[i] + objectNode := ReadObjectInfoTuple(objectInfo, i) + sid := objectNode.ObjectName().ObjectId() + catalog.onReplayCheckpointObject( + dbid, tid, sid, createTSs[i], deleteTSs[i], startTSs[i], prepareTSs[i], commitTSs[i], objectNode, isTombstone, dataFactory) + } + replayer.Submit(tid, replayFn) } } func (catalog *Catalog) onReplayCheckpointObject( dbid, tbid uint64, objid *types.Objectid, + createTS, deleteTS types.TS, + start, prepare, end types.TS, objNode *ObjectMVCCNode, - entryNode *EntryMVCCNode, - txnNode *txnbase.TxnMVCCNode, isTombstone bool, dataFactory DataFactory, ) { @@ -470,35 +483,49 @@ func (catalog *Catalog) onReplayCheckpointObject( SortHint: catalog.NextObject(), IsTombstone: isTombstone, } - object.EntryMVCCNode = *entryNode + object.EntryMVCCNode = EntryMVCCNode{ + CreatedAt: createTS, + DeletedAt: deleteTS, + } object.ObjectMVCCNode = *objNode - object.CreateNode = *txnNode + object.CreateNode = txnbase.TxnMVCCNode{ + Start: start, + Prepare: prepare, + End: end, + } object.ObjectState = ObjectState_Create_ApplyCommit object.forcePNode = true // any object replayed from checkpoint is forced to be created return object } var obj *ObjectEntry - if entryNode.CreatedAt.Equal(&txnNode.End) { + if createTS.Equal(&end) { obj = newObject() rel.AddEntryLocked(obj) } - if entryNode.DeletedAt.Equal(&txnNode.End) { + if deleteTS.Equal(&end) { obj, err = rel.GetObjectByID(objid, isTombstone) if err != nil { - panic(fmt.Sprintf("obj %v(%v), [%v %v %v] not existed, table:\n%v", objid.String(), - entryNode.String(), isTombstone, objNode.String(), - txnNode.String(), rel.StringWithLevel(3))) + panic(fmt.Sprintf("obj %v(%v %v), [%v %v %v %v %v] not existed, table:\n%v", objid.String(), + createTS.ToString(), deleteTS.ToString(), isTombstone, objNode.String(), + start.ToString(), prepare.ToString(), end.ToString(), rel.StringWithLevel(3))) + } + obj.EntryMVCCNode = EntryMVCCNode{ + CreatedAt: createTS, + DeletedAt: deleteTS, } - obj.EntryMVCCNode = *entryNode obj.ObjectMVCCNode = *objNode - obj.DeleteNode = *txnNode + obj.DeleteNode = txnbase.TxnMVCCNode{ + Start: start, + Prepare: prepare, + End: end, + } obj.ObjectState = ObjectState_Delete_ApplyCommit } - if !entryNode.CreatedAt.Equal(&txnNode.End) && !entryNode.DeletedAt.Equal(&txnNode.End) { + if !createTS.Equal(&end) && !deleteTS.Equal(&end) { // In back up, aobj is replaced with naobj and its DeleteAt is removed. // Before back up, txnNode.End equals DeleteAt of naobj. // After back up, DeleteAt is empty. - if objid.Offset() == Backup_Object_Offset && entryNode.DeletedAt.IsEmpty() { + if objid.Offset() == Backup_Object_Offset && deleteTS.IsEmpty() { obj = newObject() rel.AddEntryLocked(obj) _, sarg, _ := fault.TriggerFault("back up UT") @@ -506,29 +533,29 @@ func (catalog *Catalog) onReplayCheckpointObject( obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(obj.CreatedAt) } logutil.Warnf("obj %v, tbl %v-%d create %v, delete %v, end %v", - objid.String(), rel.fullName, rel.ID, entryNode.CreatedAt.ToString(), - entryNode.DeletedAt.ToString(), txnNode.End.ToString()) + objid.String(), rel.fullName, rel.ID, createTS.ToString(), + deleteTS.ToString(), end.ToString()) } else { - if !entryNode.DeletedAt.IsEmpty() { + if !deleteTS.IsEmpty() { logutil.Warnf("obj %v, tbl %v-%d create %v, delete %v, end %v", - objid.String(), rel.fullName, rel.ID, entryNode.CreatedAt.ToString(), - entryNode.DeletedAt.ToString(), txnNode.End.ToString()) + objid.String(), rel.fullName, rel.ID, createTS.ToString(), + deleteTS.ToString(), end.ToString()) obj, _ = rel.GetObjectByID(objid, isTombstone) if obj == nil { obj = newObject() rel.AddEntryLocked(obj) } - obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(entryNode.CreatedAt) - obj.DeleteNode = *txnbase.NewTxnMVCCNodeWithTS(entryNode.DeletedAt) + obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(createTS) + obj.DeleteNode = *txnbase.NewTxnMVCCNodeWithTS(deleteTS) } } } if obj == nil { obj, err = rel.GetObjectByID(objid, isTombstone) if err != nil { - panic(fmt.Sprintf("obj %v(%v), [%v %v %v] not existed, table:\n%v", objid.String(), - entryNode.String(), isTombstone, objNode.String(), - txnNode.String(), rel.StringWithLevel(3))) + panic(fmt.Sprintf("obj %v(%v %v), [%v %v %v %v %v] not existed, table:\n%v", objid.String(), + createTS.ToString(), deleteTS.ToString(), isTombstone, objNode.String(), + start.ToString(), prepare.ToString(), end.ToString(), rel.StringWithLevel(3))) } } if obj.objData == nil { diff --git a/pkg/vm/engine/tae/catalog/metamvccnode.go b/pkg/vm/engine/tae/catalog/metamvccnode.go index 96aeebe96f6d1..858abc36ff57c 100644 --- a/pkg/vm/engine/tae/catalog/metamvccnode.go +++ b/pkg/vm/engine/tae/catalog/metamvccnode.go @@ -173,8 +173,10 @@ func (e *ObjectMVCCNode) AppendTuple(sid *types.Objectid, batch *containers.Batc func ReadObjectInfoTuple(bat *containers.Batch, row int) (e *ObjectMVCCNode) { buf := bat.GetVectorByName(ObjectAttr_ObjectStats).Get(row).([]byte) + buf2 := make([]byte, len(buf)) + copy(buf2, buf) e = &ObjectMVCCNode{ - ObjectStats: (objectio.ObjectStats)(buf), + ObjectStats: (objectio.ObjectStats)(buf2), } return } diff --git a/pkg/vm/engine/tae/db/checkpoint/replay.go b/pkg/vm/engine/tae/db/checkpoint/replay.go index 2b25566f20cba..116021cb6014e 100644 --- a/pkg/vm/engine/tae/db/checkpoint/replay.go +++ b/pkg/vm/engine/tae/db/checkpoint/replay.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "sort" + "sync" "time" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -30,6 +31,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables" @@ -43,6 +45,10 @@ const ( ReadData ) +const ( + DefaultObjectReplayWorkerCount = 10 +) + type CkpReplayer struct { r *runner dataF catalog.DataFactory @@ -55,12 +61,24 @@ type CkpReplayer struct { readDuration, applyDuration time.Duration readCount, applyCount, totalCount int + + objectReplayWorker []sm.Queue + wg sync.WaitGroup + objectCountMap map[uint64]int } func (c *CkpReplayer) Close() { for _, close := range c.closes { close() } + for _, worker := range c.objectReplayWorker { + worker.Stop() + } + for _, data := range c.ckpdatas { + if data != nil { + data.Close() + } + } } func (c *CkpReplayer) ReadCkpFiles() (err error) { @@ -187,7 +205,7 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) { closecbs := make([]func(), 0) c.totalCount = len(entries) - readfn := func(i int, readType uint16) { + readfn := func(i int, readType uint16) (err error) { checkpointEntry := entries[i] checkpointEntry.sid = r.rt.SID() if checkpointEntry.end.LT(&maxGlobalEnd) { @@ -218,6 +236,7 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) { closecbs = append(closecbs, func() { datas[i].CloseWhenLoadFromCache(checkpointEntry.version) }) } } + return nil } c.closes = append(c.closes, closecbs...) t0 = time.Now() @@ -229,16 +248,24 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) { } } for i := 0; i < bat.Length(); i++ { - readfn(i, PrefetchMetaIdx) + if err = readfn(i, PrefetchMetaIdx); err != nil { + return + } } for i := 0; i < bat.Length(); i++ { - readfn(i, ReadMetaIdx) + if err = readfn(i, ReadMetaIdx); err != nil { + return + } } for i := 0; i < bat.Length(); i++ { - readfn(i, PrefetchData) + if err = readfn(i, PrefetchData); err != nil { + return + } } for i := 0; i < bat.Length(); i++ { - readfn(i, ReadData) + if err = readfn(i, ReadData); err != nil { + return + } } c.ckpdatas = datas c.readDuration += time.Since(t0) @@ -286,7 +313,7 @@ func (c *CkpReplayer) ReplayThreeTablesObjectlist(phase string) ( dataFactory := c.dataF maxGlobal := r.MaxGlobalCheckpoint() if maxGlobal != nil { - err = datas[c.globalCkpIdx].ApplyReplayTo(r.catalog, dataFactory, true) + err = datas[c.globalCkpIdx].ApplyReplayTo(c, r.catalog, dataFactory, true) c.applyCount++ logger := logutil.Info if err != nil { @@ -332,7 +359,7 @@ func (c *CkpReplayer) ReplayThreeTablesObjectlist(phase string) ( continue } start := time.Now() - if err = datas[i].ApplyReplayTo(r.catalog, dataFactory, true); err != nil { + if err = datas[i].ApplyReplayTo(c, r.catalog, dataFactory, true); err != nil { logger = logutil.Error } logger( @@ -363,6 +390,7 @@ func (c *CkpReplayer) ReplayThreeTablesObjectlist(phase string) ( isLSNValid = false } } + c.wg.Wait() return } @@ -419,7 +447,7 @@ func (c *CkpReplayer) ReplayObjectlist(phase string) (err error) { var ckpVers []uint32 var ckpDatas []*logtail.CheckpointData if maxGlobal := r.MaxGlobalCheckpoint(); maxGlobal != nil { - err = datas[c.globalCkpIdx].ApplyReplayTo(r.catalog, dataFactory, false) + err = datas[c.globalCkpIdx].ApplyReplayTo(c, r.catalog, dataFactory, false) if err != nil { return } @@ -436,6 +464,7 @@ func (c *CkpReplayer) ReplayObjectlist(phase string) (err error) { continue } err = datas[i].ApplyReplayTo( + c, r.catalog, dataFactory, false) @@ -448,12 +477,22 @@ func (c *CkpReplayer) ReplayObjectlist(phase string) (err error) { ckpVers = append(ckpVers, checkpointEntry.version) ckpDatas = append(ckpDatas, datas[i]) } + c.wg.Wait() c.applyDuration += time.Since(t0) r.catalog.GetUsageMemo().(*logtail.TNUsageMemo).PrepareReplay(ckpDatas, ckpVers) r.source.Init(maxTs) + maxTableID, maxObjectCount := uint64(0), 0 + for tid, count := range c.objectCountMap { + if count > maxObjectCount { + maxTableID = tid + maxObjectCount = count + } + } logutil.Info( "Replay-Checkpoints", zap.String("phase", phase), + zap.Uint64("max table tid", maxTableID), + zap.Int("object count (create count + delete count)", maxObjectCount), zap.Duration("apply-cost", c.applyDuration), zap.Duration("read-cost", c.readDuration), zap.Int("apply-count", c.applyCount), @@ -463,11 +502,32 @@ func (c *CkpReplayer) ReplayObjectlist(phase string) (err error) { return } +func (c *CkpReplayer) Submit(tid uint64, replayFn func()) { + c.wg.Add(1) + workerOffset := tid % uint64(len(c.objectReplayWorker)) + c.objectCountMap[tid] = c.objectCountMap[tid] + 1 + c.objectReplayWorker[workerOffset].Enqueue(replayFn) +} + func (r *runner) Replay(dataFactory catalog.DataFactory) *CkpReplayer { - return &CkpReplayer{ - r: r, - dataF: dataFactory, + replayer := &CkpReplayer{ + r: r, + dataF: dataFactory, + objectCountMap: make(map[uint64]int), + } + objectWorker := make([]sm.Queue, DefaultObjectReplayWorkerCount) + for i := 0; i < DefaultObjectReplayWorkerCount; i++ { + objectWorker[i] = sm.NewSafeQueue(10000, 100, func(items ...any) { + for _, item := range items { + fn := item.(func()) + fn() + replayer.wg.Done() + } + }) + objectWorker[i].Start() } + replayer.objectReplayWorker = objectWorker + return replayer } func MergeCkpMeta( diff --git a/pkg/vm/engine/tae/db/open.go b/pkg/vm/engine/tae/db/open.go index 95bee1419e0ed..25ef723b7d7e4 100644 --- a/pkg/vm/engine/tae/db/open.go +++ b/pkg/vm/engine/tae/db/open.go @@ -221,6 +221,7 @@ func Open( now := time.Now() ckpReplayer := db.BGCheckpointRunner.Replay(dataFactory) + defer ckpReplayer.Close() if err = ckpReplayer.ReadCkpFiles(); err != nil { panic(err) } diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 0f4d7833826ac..439c75bb0f49a 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -10502,7 +10502,55 @@ func TestDedup5(t *testing.T) { assert.Error(t, err) assert.NoError(t, insertTxn.Commit(ctx)) } - +func TestCheckpointObjectList(t *testing.T) { + ctx := context.Background() + opts := config.WithLongScanAndCKPOpts(nil) + tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) + defer tae.Close() + txn, _ := tae.StartTxn(nil) + testutil.CreateDatabase2(ctx, txn, "db") + txn.Commit(ctx) + var wg sync.WaitGroup + pool, _ := ants.NewPool(80) + defer pool.Release() + var tblNameIndex atomic.Int32 + createRelAndAppend := func() { + defer wg.Done() + schema := catalog.MockSchemaAll(3, -1) + schema.Name = fmt.Sprintf("tbl%d", tblNameIndex.Add(1)) + schema.Extra.BlockMaxRows = 1 + schema.Extra.ObjectMaxBlocks = 256 + bat := catalog.MockBatch(schema, 1) + txn, _ := tae.StartTxn(nil) + db, _ := txn.GetDatabase("db") + testutil.CreateRelation2(ctx, txn, db, schema) + txn.Commit(ctx) + for i := 0; i < 10; i++ { + txn, rel := testutil.GetRelation(t, 0, tae.DB, "db", schema.Name) + rel.Append(ctx, bat) + txn.Commit(ctx) + testutil.CompactBlocks(t, 0, tae.DB, "db", schema, true) + } + bat.Close() + } + for i := 0; i < 10; i++ { + wg.Add(1) + pool.Submit(createRelAndAppend) + } + wg.Wait() + tae.ForceCheckpoint() + for i := 1; i <= 10; i++ { + txn, rel := testutil.GetRelation(t, 0, tae.DB, "db", fmt.Sprintf("tbl%d", i)) + testutil.CheckAllColRowsByScan(t, rel, 10, false) + txn.Commit(ctx) + } + tae.Restart(ctx) + for i := 1; i <= 10; i++ { + txn, rel := testutil.GetRelation(t, 0, tae.DB, "db", fmt.Sprintf("tbl%d", i)) + testutil.CheckAllColRowsByScan(t, rel, 10, false) + txn.Commit(ctx) + } +} func TestReplayDebugLog(t *testing.T) { ctx := context.Background() diff --git a/pkg/vm/engine/tae/logtail/utils.go b/pkg/vm/engine/tae/logtail/utils.go index 37523aa19081d..5d063a20a5fd1 100644 --- a/pkg/vm/engine/tae/logtail/utils.go +++ b/pkg/vm/engine/tae/logtail/utils.go @@ -492,14 +492,15 @@ func NewGlobalCollector( } func (data *CheckpointData) ApplyReplayTo( + objectlistReplayer catalog.ObjectListReplayer, c *catalog.Catalog, dataFactory catalog.DataFactory, forSys bool, ) (err error) { objectInfo := data.GetTombstoneObjectBatchs() - c.OnReplayObjectBatch(objectInfo, true, dataFactory, forSys) + c.OnReplayObjectBatch(objectlistReplayer, objectInfo, true, dataFactory, forSys) objectInfo = data.GetObjectBatchs() - c.OnReplayObjectBatch(objectInfo, false, dataFactory, forSys) + c.OnReplayObjectBatch(objectlistReplayer, objectInfo, false, dataFactory, forSys) return } diff --git a/pkg/vm/engine/tae/txn/txnbase/mvccnode.go b/pkg/vm/engine/tae/txn/txnbase/mvccnode.go index fde44f6f237f8..5ab40bac85560 100644 --- a/pkg/vm/engine/tae/txn/txnbase/mvccnode.go +++ b/pkg/vm/engine/tae/txn/txnbase/mvccnode.go @@ -464,15 +464,3 @@ func (un *TxnMVCCNode) AppendTupleWithCommitTS(bat *containers.Batch, commitTS t func (un *TxnMVCCNode) ReadTuple(bat *containers.Batch, offset int) { // TODO } - -func ReadTuple(bat *containers.Batch, row int) (un *TxnMVCCNode) { - end := bat.GetVectorByName(SnapshotAttr_CommitTS).Get(row).(types.TS) - start := bat.GetVectorByName(SnapshotAttr_StartTS).Get(row).(types.TS) - prepare := bat.GetVectorByName(SnapshotAttr_PrepareTS).Get(row).(types.TS) - un = &TxnMVCCNode{ - Start: start, - Prepare: prepare, - End: end, - } - return -}