From eae0aed1b14c8d6c0146abb72d5bbb03c48b4fb5 Mon Sep 17 00:00:00 2001 From: GreatRiver <14086886+LeftHandCold@users.noreply.github.com> Date: Tue, 14 Jan 2025 17:36:58 +0800 Subject: [PATCH] Add last ckp backup (#68) * Update code * Update code * Update code * Fix bug --- pkg/backup/tae.go | 9 +++++- pkg/vm/engine/tae/logtail/backup.go | 46 ++++++++++++++++++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/pkg/backup/tae.go b/pkg/backup/tae.go index 822497a22d030..5297a261d4e0f 100644 --- a/pkg/backup/tae.go +++ b/pkg/backup/tae.go @@ -293,6 +293,10 @@ func execBackup( }() now := time.Now() baseTS := ts + // When rewriting the checkpoint and trimming the aobject, + // you need to collect the atombstone in the last checkpoint + // Before this, only the last special checkpoint needs to be collected + var lastData *logtail.CheckpointData for i, name := range names { if len(name) == 0 { continue @@ -322,6 +326,9 @@ func execBackup( } defer data.Close() oNames = append(oNames, oneNames...) + if i == len(names)-1 { + lastData = data + } } loadDuration += time.Since(now) @@ -402,7 +409,7 @@ func execBackup( tnLocation objectio.Location ) cnLocation, tnLocation, checkpointFiles, err = logtail.ReWriteCheckpointAndBlockFromKey(ctx, sid, srcFs, dstFs, - cnLocation, uint32(version), start) + cnLocation, lastData, uint32(version), start) for _, name := range checkpointFiles { dentry, err := dstFs.StatFile(ctx, name) if err != nil { diff --git a/pkg/vm/engine/tae/logtail/backup.go b/pkg/vm/engine/tae/logtail/backup.go index 28af96bb28b65..30d17ab906f55 100644 --- a/pkg/vm/engine/tae/logtail/backup.go +++ b/pkg/vm/engine/tae/logtail/backup.go @@ -488,6 +488,7 @@ func ReWriteCheckpointAndBlockFromKey( sid string, fs, dstFs fileservice.FileService, loc objectio.Location, + lastCkpData *CheckpointData, version uint32, ts types.TS, ) (objectio.Location, objectio.Location, []string, error) { logutil.Info("[Start]", common.OperationField("ReWrite Checkpoint"), @@ -505,6 +506,9 @@ func ReWriteCheckpointAndBlockFromKey( }() objectsData := make(map[string]*objData, 0) tombstonesData := make(map[string]*objData, 0) + // tombstonesData2 is the tombstone recorded in the last checkpoint, + // only used when cutting aobject, and does not need to modify itself + tombstonesData2 := make(map[string]*objData, 0) defer func() { for i := range objectsData { @@ -564,8 +568,36 @@ func ReWriteCheckpointAndBlockFromKey( return objInfoData } + initData2 := func( + od *map[string]*objData, + idx uint16, + dataType objectio.DataMetaType, + ) *containers.Batch { + objInfoData := lastCkpData.bats[idx] + objInfoStats := objInfoData.GetVectorByName(ObjectAttr_ObjectStats) + objInfoTid := objInfoData.GetVectorByName(SnapshotAttr_TID) + objInfoDelete := objInfoData.GetVectorByName(EntryNode_DeleteAt) + + for i := 0; i < objInfoData.Length(); i++ { + stats := objectio.NewObjectStats() + stats.UnMarshal(objInfoStats.Get(i).([]byte)) + appendable := stats.GetAppendable() + deleteAt := objInfoDelete.Get(i).(types.TS) + tid := objInfoTid.Get(i).(uint64) + if deleteAt.IsEmpty() { + continue + } + if !appendable { + continue + } + addObjectToObjectData(stats, appendable, i, tid, dataType, od) + } + return objInfoData + } + objInfoData := initData(&objectsData, ObjectInfoIDX, objectio.SchemaData) tombstoneInfoData := initData(&tombstonesData, TombstoneObjectInfoIDX, objectio.SchemaTombstone) + initData2(&tombstonesData2, TombstoneObjectInfoIDX, objectio.SchemaTombstone) phaseNumber = 3 @@ -574,6 +606,11 @@ func ReWriteCheckpointAndBlockFromKey( if err != nil { return nil, nil, nil, err } + // Trim tombstone files based on timestamp + err = trimTombstoneData(ctx, fs, ts, &tombstonesData2) + if err != nil { + return nil, nil, nil, err + } backupPool := dbutils.MakeDefaultSmallPool("backup-vector-pool") defer backupPool.Destory() @@ -649,10 +686,17 @@ func ReWriteCheckpointAndBlockFromKey( return nil } + // tombstonesData2 is used to merge the source of ds + dsTombstone := tombstonesData2 + for key, objectData := range tombstonesData { + if dsTombstone[key] == nil { + dsTombstone[key] = objectData + } + } err = insertBatchFun( objectsData, func(oData *objData, writer *ioutil.BlockWriter) (bool, error) { - ds := NewBackupDeltaLocDataSource(ctx, fs, ts, tombstonesData) + ds := NewBackupDeltaLocDataSource(ctx, fs, ts, dsTombstone) blk := oData.stats.ConstructBlockInfo(uint16(0)) bat, sortKey, err := blockio.BlockDataReadBackup(ctx, &blk, ds, nil, ts, fs) if err != nil {