Skip to content

Commit

Permalink
Optimize replay (#20837)
Browse files Browse the repository at this point in the history
Optimize replay, replay checkpoint with 10 workers

Approved by: @XuPeng-SH
  • Loading branch information
jiangxinmeng1 authored Dec 20, 2024
1 parent c02cad7 commit f491bcd
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 62 deletions.
6 changes: 3 additions & 3 deletions pkg/vm/engine/tae/catalog/basemvccnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ func (un EntryMVCCNode) AppendTupleWithCommitTS(bat *containers.Batch, ts types.
)
}

func ReadEntryNodeTuple(bat *containers.Batch, row int) (un *EntryMVCCNode) {
func ReadEntryNodeTuple(createAt, deleteAt types.TS) (un *EntryMVCCNode) {
un = &EntryMVCCNode{
CreatedAt: bat.GetVectorByName(EntryNode_CreateAt).Get(row).(types.TS),
DeletedAt: bat.GetVectorByName(EntryNode_DeleteAt).Get(row).(types.TS),
CreatedAt: createAt,
DeletedAt: deleteAt,
}
return
}
Expand Down
91 changes: 59 additions & 32 deletions pkg/vm/engine/tae/catalog/catalogreplay.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
pkgcatalog "github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/util/fault"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
Expand All @@ -33,6 +34,10 @@ const (
Backup_Object_Offset uint16 = 1000
)

type ObjectListReplayer interface {
Submit(uint64, func())
}

//#region Replay WAL related

func (catalog *Catalog) ReplayCmd(
Expand Down Expand Up @@ -421,27 +426,35 @@ func (catalog *Catalog) onReplayCreateTable(dbid, tid uint64, schema *Schema, tx
tbl.InsertLocked(un)
}

func (catalog *Catalog) OnReplayObjectBatch(objectInfo *containers.Batch, isTombstone bool, dataFactory DataFactory, forSys bool) {
for i := 0; i < objectInfo.Length(); i++ {
tid := objectInfo.GetVectorByName(SnapshotAttr_TID).Get(i).(uint64)
func (catalog *Catalog) OnReplayObjectBatch(replayer ObjectListReplayer, objectInfo *containers.Batch, isTombstone bool, dataFactory DataFactory, forSys bool) {
tids := vector.MustFixedColNoTypeCheck[uint64](objectInfo.GetVectorByName(SnapshotAttr_TID).GetDownstreamVector())
dbids := vector.MustFixedColNoTypeCheck[uint64](objectInfo.GetVectorByName(SnapshotAttr_DBID).GetDownstreamVector())
commitTSs := vector.MustFixedColNoTypeCheck[types.TS](objectInfo.GetVectorByName(txnbase.SnapshotAttr_CommitTS).GetDownstreamVector())
prepareTSs := vector.MustFixedColNoTypeCheck[types.TS](objectInfo.GetVectorByName(txnbase.SnapshotAttr_PrepareTS).GetDownstreamVector())
startTSs := vector.MustFixedColNoTypeCheck[types.TS](objectInfo.GetVectorByName(txnbase.SnapshotAttr_StartTS).GetDownstreamVector())
createTSs := vector.MustFixedColNoTypeCheck[types.TS](objectInfo.GetVectorByName(EntryNode_CreateAt).GetDownstreamVector())
deleteTSs := vector.MustFixedColNoTypeCheck[types.TS](objectInfo.GetVectorByName(EntryNode_DeleteAt).GetDownstreamVector())
for i, tid := range tids {
if forSys != pkgcatalog.IsSystemTable(tid) {
continue
}
dbid := objectInfo.GetVectorByName(SnapshotAttr_DBID).Get(i).(uint64)
objectNode := ReadObjectInfoTuple(objectInfo, i)
sid := objectNode.ObjectName().ObjectId()
txnNode := txnbase.ReadTuple(objectInfo, i)
entryNode := ReadEntryNodeTuple(objectInfo, i)
catalog.onReplayCheckpointObject(dbid, tid, sid, objectNode, entryNode, txnNode, isTombstone, dataFactory)
replayFn := func() {
dbid := dbids[i]
objectNode := ReadObjectInfoTuple(objectInfo, i)
sid := objectNode.ObjectName().ObjectId()
catalog.onReplayCheckpointObject(
dbid, tid, sid, createTSs[i], deleteTSs[i], startTSs[i], prepareTSs[i], commitTSs[i], objectNode, isTombstone, dataFactory)
}
replayer.Submit(tid, replayFn)
}
}

func (catalog *Catalog) onReplayCheckpointObject(
dbid, tbid uint64,
objid *types.Objectid,
createTS, deleteTS types.TS,
start, prepare, end types.TS,
objNode *ObjectMVCCNode,
entryNode *EntryMVCCNode,
txnNode *txnbase.TxnMVCCNode,
isTombstone bool,
dataFactory DataFactory,
) {
Expand Down Expand Up @@ -470,65 +483,79 @@ func (catalog *Catalog) onReplayCheckpointObject(
SortHint: catalog.NextObject(),
IsTombstone: isTombstone,
}
object.EntryMVCCNode = *entryNode
object.EntryMVCCNode = EntryMVCCNode{
CreatedAt: createTS,
DeletedAt: deleteTS,
}
object.ObjectMVCCNode = *objNode
object.CreateNode = *txnNode
object.CreateNode = txnbase.TxnMVCCNode{
Start: start,
Prepare: prepare,
End: end,
}
object.ObjectState = ObjectState_Create_ApplyCommit
object.forcePNode = true // any object replayed from checkpoint is forced to be created
return object
}
var obj *ObjectEntry
if entryNode.CreatedAt.Equal(&txnNode.End) {
if createTS.Equal(&end) {
obj = newObject()
rel.AddEntryLocked(obj)
}
if entryNode.DeletedAt.Equal(&txnNode.End) {
if deleteTS.Equal(&end) {
obj, err = rel.GetObjectByID(objid, isTombstone)
if err != nil {
panic(fmt.Sprintf("obj %v(%v), [%v %v %v] not existed, table:\n%v", objid.String(),
entryNode.String(), isTombstone, objNode.String(),
txnNode.String(), rel.StringWithLevel(3)))
panic(fmt.Sprintf("obj %v(%v %v), [%v %v %v %v %v] not existed, table:\n%v", objid.String(),
createTS.ToString(), deleteTS.ToString(), isTombstone, objNode.String(),
start.ToString(), prepare.ToString(), end.ToString(), rel.StringWithLevel(3)))
}
obj.EntryMVCCNode = EntryMVCCNode{
CreatedAt: createTS,
DeletedAt: deleteTS,
}
obj.EntryMVCCNode = *entryNode
obj.ObjectMVCCNode = *objNode
obj.DeleteNode = *txnNode
obj.DeleteNode = txnbase.TxnMVCCNode{
Start: start,
Prepare: prepare,
End: end,
}
obj.ObjectState = ObjectState_Delete_ApplyCommit
}
if !entryNode.CreatedAt.Equal(&txnNode.End) && !entryNode.DeletedAt.Equal(&txnNode.End) {
if !createTS.Equal(&end) && !deleteTS.Equal(&end) {
// In back up, aobj is replaced with naobj and its DeleteAt is removed.
// Before back up, txnNode.End equals DeleteAt of naobj.
// After back up, DeleteAt is empty.
if objid.Offset() == Backup_Object_Offset && entryNode.DeletedAt.IsEmpty() {
if objid.Offset() == Backup_Object_Offset && deleteTS.IsEmpty() {
obj = newObject()
rel.AddEntryLocked(obj)
_, sarg, _ := fault.TriggerFault("back up UT")
if sarg == "" {
obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(obj.CreatedAt)
}
logutil.Warnf("obj %v, tbl %v-%d create %v, delete %v, end %v",
objid.String(), rel.fullName, rel.ID, entryNode.CreatedAt.ToString(),
entryNode.DeletedAt.ToString(), txnNode.End.ToString())
objid.String(), rel.fullName, rel.ID, createTS.ToString(),
deleteTS.ToString(), end.ToString())
} else {
if !entryNode.DeletedAt.IsEmpty() {
if !deleteTS.IsEmpty() {
logutil.Warnf("obj %v, tbl %v-%d create %v, delete %v, end %v",
objid.String(), rel.fullName, rel.ID, entryNode.CreatedAt.ToString(),
entryNode.DeletedAt.ToString(), txnNode.End.ToString())
objid.String(), rel.fullName, rel.ID, createTS.ToString(),
deleteTS.ToString(), end.ToString())
obj, _ = rel.GetObjectByID(objid, isTombstone)
if obj == nil {
obj = newObject()
rel.AddEntryLocked(obj)
}
obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(entryNode.CreatedAt)
obj.DeleteNode = *txnbase.NewTxnMVCCNodeWithTS(entryNode.DeletedAt)
obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(createTS)
obj.DeleteNode = *txnbase.NewTxnMVCCNodeWithTS(deleteTS)
}
}
}
if obj == nil {
obj, err = rel.GetObjectByID(objid, isTombstone)
if err != nil {
panic(fmt.Sprintf("obj %v(%v), [%v %v %v] not existed, table:\n%v", objid.String(),
entryNode.String(), isTombstone, objNode.String(),
txnNode.String(), rel.StringWithLevel(3)))
panic(fmt.Sprintf("obj %v(%v %v), [%v %v %v %v %v] not existed, table:\n%v", objid.String(),
createTS.ToString(), deleteTS.ToString(), isTombstone, objNode.String(),
start.ToString(), prepare.ToString(), end.ToString(), rel.StringWithLevel(3)))
}
}
if obj.objData == nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/vm/engine/tae/catalog/metamvccnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,10 @@ func (e *ObjectMVCCNode) AppendTuple(sid *types.Objectid, batch *containers.Batc

func ReadObjectInfoTuple(bat *containers.Batch, row int) (e *ObjectMVCCNode) {
buf := bat.GetVectorByName(ObjectAttr_ObjectStats).Get(row).([]byte)
buf2 := make([]byte, len(buf))
copy(buf2, buf)
e = &ObjectMVCCNode{
ObjectStats: (objectio.ObjectStats)(buf),
ObjectStats: (objectio.ObjectStats)(buf2),
}
return
}
Expand Down
82 changes: 71 additions & 11 deletions pkg/vm/engine/tae/db/checkpoint/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"sort"
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables"
Expand All @@ -43,6 +45,10 @@ const (
ReadData
)

const (
DefaultObjectReplayWorkerCount = 10
)

type CkpReplayer struct {
r *runner
dataF catalog.DataFactory
Expand All @@ -55,12 +61,24 @@ type CkpReplayer struct {

readDuration, applyDuration time.Duration
readCount, applyCount, totalCount int

objectReplayWorker []sm.Queue
wg sync.WaitGroup
objectCountMap map[uint64]int
}

func (c *CkpReplayer) Close() {
for _, close := range c.closes {
close()
}
for _, worker := range c.objectReplayWorker {
worker.Stop()
}
for _, data := range c.ckpdatas {
if data != nil {
data.Close()
}
}
}

func (c *CkpReplayer) ReadCkpFiles() (err error) {
Expand Down Expand Up @@ -187,7 +205,7 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) {

closecbs := make([]func(), 0)
c.totalCount = len(entries)
readfn := func(i int, readType uint16) {
readfn := func(i int, readType uint16) (err error) {
checkpointEntry := entries[i]
checkpointEntry.sid = r.rt.SID()
if checkpointEntry.end.LT(&maxGlobalEnd) {
Expand Down Expand Up @@ -218,6 +236,7 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) {
closecbs = append(closecbs, func() { datas[i].CloseWhenLoadFromCache(checkpointEntry.version) })
}
}
return nil
}
c.closes = append(c.closes, closecbs...)
t0 = time.Now()
Expand All @@ -229,16 +248,24 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) {
}
}
for i := 0; i < bat.Length(); i++ {
readfn(i, PrefetchMetaIdx)
if err = readfn(i, PrefetchMetaIdx); err != nil {
return
}
}
for i := 0; i < bat.Length(); i++ {
readfn(i, ReadMetaIdx)
if err = readfn(i, ReadMetaIdx); err != nil {
return
}
}
for i := 0; i < bat.Length(); i++ {
readfn(i, PrefetchData)
if err = readfn(i, PrefetchData); err != nil {
return
}
}
for i := 0; i < bat.Length(); i++ {
readfn(i, ReadData)
if err = readfn(i, ReadData); err != nil {
return
}
}
c.ckpdatas = datas
c.readDuration += time.Since(t0)
Expand Down Expand Up @@ -286,7 +313,7 @@ func (c *CkpReplayer) ReplayThreeTablesObjectlist(phase string) (
dataFactory := c.dataF
maxGlobal := r.MaxGlobalCheckpoint()
if maxGlobal != nil {
err = datas[c.globalCkpIdx].ApplyReplayTo(r.catalog, dataFactory, true)
err = datas[c.globalCkpIdx].ApplyReplayTo(c, r.catalog, dataFactory, true)
c.applyCount++
logger := logutil.Info
if err != nil {
Expand Down Expand Up @@ -332,7 +359,7 @@ func (c *CkpReplayer) ReplayThreeTablesObjectlist(phase string) (
continue
}
start := time.Now()
if err = datas[i].ApplyReplayTo(r.catalog, dataFactory, true); err != nil {
if err = datas[i].ApplyReplayTo(c, r.catalog, dataFactory, true); err != nil {
logger = logutil.Error
}
logger(
Expand Down Expand Up @@ -363,6 +390,7 @@ func (c *CkpReplayer) ReplayThreeTablesObjectlist(phase string) (
isLSNValid = false
}
}
c.wg.Wait()
return
}

Expand Down Expand Up @@ -419,7 +447,7 @@ func (c *CkpReplayer) ReplayObjectlist(phase string) (err error) {
var ckpVers []uint32
var ckpDatas []*logtail.CheckpointData
if maxGlobal := r.MaxGlobalCheckpoint(); maxGlobal != nil {
err = datas[c.globalCkpIdx].ApplyReplayTo(r.catalog, dataFactory, false)
err = datas[c.globalCkpIdx].ApplyReplayTo(c, r.catalog, dataFactory, false)
if err != nil {
return
}
Expand All @@ -436,6 +464,7 @@ func (c *CkpReplayer) ReplayObjectlist(phase string) (err error) {
continue
}
err = datas[i].ApplyReplayTo(
c,
r.catalog,
dataFactory,
false)
Expand All @@ -448,12 +477,22 @@ func (c *CkpReplayer) ReplayObjectlist(phase string) (err error) {
ckpVers = append(ckpVers, checkpointEntry.version)
ckpDatas = append(ckpDatas, datas[i])
}
c.wg.Wait()
c.applyDuration += time.Since(t0)
r.catalog.GetUsageMemo().(*logtail.TNUsageMemo).PrepareReplay(ckpDatas, ckpVers)
r.source.Init(maxTs)
maxTableID, maxObjectCount := uint64(0), 0
for tid, count := range c.objectCountMap {
if count > maxObjectCount {
maxTableID = tid
maxObjectCount = count
}
}
logutil.Info(
"Replay-Checkpoints",
zap.String("phase", phase),
zap.Uint64("max table tid", maxTableID),
zap.Int("object count (create count + delete count)", maxObjectCount),
zap.Duration("apply-cost", c.applyDuration),
zap.Duration("read-cost", c.readDuration),
zap.Int("apply-count", c.applyCount),
Expand All @@ -463,11 +502,32 @@ func (c *CkpReplayer) ReplayObjectlist(phase string) (err error) {
return
}

func (c *CkpReplayer) Submit(tid uint64, replayFn func()) {
c.wg.Add(1)
workerOffset := tid % uint64(len(c.objectReplayWorker))
c.objectCountMap[tid] = c.objectCountMap[tid] + 1
c.objectReplayWorker[workerOffset].Enqueue(replayFn)
}

func (r *runner) Replay(dataFactory catalog.DataFactory) *CkpReplayer {
return &CkpReplayer{
r: r,
dataF: dataFactory,
replayer := &CkpReplayer{
r: r,
dataF: dataFactory,
objectCountMap: make(map[uint64]int),
}
objectWorker := make([]sm.Queue, DefaultObjectReplayWorkerCount)
for i := 0; i < DefaultObjectReplayWorkerCount; i++ {
objectWorker[i] = sm.NewSafeQueue(10000, 100, func(items ...any) {
for _, item := range items {
fn := item.(func())
fn()
replayer.wg.Done()
}
})
objectWorker[i].Start()
}
replayer.objectReplayWorker = objectWorker
return replayer
}

func MergeCkpMeta(
Expand Down
Loading

0 comments on commit f491bcd

Please sign in to comment.