Skip to content

Commit

Permalink
Add last ckp backup (#68)
Browse files Browse the repository at this point in the history
* Update code

* Update code

* Update code

* Fix bug
  • Loading branch information
LeftHandCold authored and aptend committed Jan 17, 2025
1 parent 81ebdfc commit eae0aed
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 2 deletions.
9 changes: 8 additions & 1 deletion pkg/backup/tae.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ func execBackup(
}()
now := time.Now()
baseTS := ts
// When rewriting the checkpoint and trimming the aobject,
// you need to collect the atombstone in the last checkpoint
// Before this, only the last special checkpoint needs to be collected
var lastData *logtail.CheckpointData
for i, name := range names {
if len(name) == 0 {
continue
Expand Down Expand Up @@ -322,6 +326,9 @@ func execBackup(
}
defer data.Close()
oNames = append(oNames, oneNames...)
if i == len(names)-1 {
lastData = data
}
}
loadDuration += time.Since(now)

Expand Down Expand Up @@ -402,7 +409,7 @@ func execBackup(
tnLocation objectio.Location
)
cnLocation, tnLocation, checkpointFiles, err = logtail.ReWriteCheckpointAndBlockFromKey(ctx, sid, srcFs, dstFs,
cnLocation, uint32(version), start)
cnLocation, lastData, uint32(version), start)
for _, name := range checkpointFiles {
dentry, err := dstFs.StatFile(ctx, name)
if err != nil {
Expand Down
46 changes: 45 additions & 1 deletion pkg/vm/engine/tae/logtail/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ func ReWriteCheckpointAndBlockFromKey(
sid string,
fs, dstFs fileservice.FileService,
loc objectio.Location,
lastCkpData *CheckpointData,
version uint32, ts types.TS,
) (objectio.Location, objectio.Location, []string, error) {
logutil.Info("[Start]", common.OperationField("ReWrite Checkpoint"),
Expand All @@ -505,6 +506,9 @@ func ReWriteCheckpointAndBlockFromKey(
}()
objectsData := make(map[string]*objData, 0)
tombstonesData := make(map[string]*objData, 0)
// tombstonesData2 is the tombstone recorded in the last checkpoint,
// only used when cutting aobject, and does not need to modify itself
tombstonesData2 := make(map[string]*objData, 0)

defer func() {
for i := range objectsData {
Expand Down Expand Up @@ -564,8 +568,36 @@ func ReWriteCheckpointAndBlockFromKey(
return objInfoData
}

initData2 := func(
od *map[string]*objData,
idx uint16,
dataType objectio.DataMetaType,
) *containers.Batch {
objInfoData := lastCkpData.bats[idx]
objInfoStats := objInfoData.GetVectorByName(ObjectAttr_ObjectStats)
objInfoTid := objInfoData.GetVectorByName(SnapshotAttr_TID)
objInfoDelete := objInfoData.GetVectorByName(EntryNode_DeleteAt)

for i := 0; i < objInfoData.Length(); i++ {
stats := objectio.NewObjectStats()
stats.UnMarshal(objInfoStats.Get(i).([]byte))
appendable := stats.GetAppendable()
deleteAt := objInfoDelete.Get(i).(types.TS)
tid := objInfoTid.Get(i).(uint64)
if deleteAt.IsEmpty() {
continue
}
if !appendable {
continue
}
addObjectToObjectData(stats, appendable, i, tid, dataType, od)
}
return objInfoData
}

objInfoData := initData(&objectsData, ObjectInfoIDX, objectio.SchemaData)
tombstoneInfoData := initData(&tombstonesData, TombstoneObjectInfoIDX, objectio.SchemaTombstone)
initData2(&tombstonesData2, TombstoneObjectInfoIDX, objectio.SchemaTombstone)

phaseNumber = 3

Expand All @@ -574,6 +606,11 @@ func ReWriteCheckpointAndBlockFromKey(
if err != nil {
return nil, nil, nil, err
}
// Trim tombstone files based on timestamp
err = trimTombstoneData(ctx, fs, ts, &tombstonesData2)
if err != nil {
return nil, nil, nil, err
}

backupPool := dbutils.MakeDefaultSmallPool("backup-vector-pool")
defer backupPool.Destory()
Expand Down Expand Up @@ -649,10 +686,17 @@ func ReWriteCheckpointAndBlockFromKey(
return nil
}

// tombstonesData2 is used to merge the source of ds
dsTombstone := tombstonesData2
for key, objectData := range tombstonesData {
if dsTombstone[key] == nil {
dsTombstone[key] = objectData
}
}
err = insertBatchFun(
objectsData,
func(oData *objData, writer *ioutil.BlockWriter) (bool, error) {
ds := NewBackupDeltaLocDataSource(ctx, fs, ts, tombstonesData)
ds := NewBackupDeltaLocDataSource(ctx, fs, ts, dsTombstone)
blk := oData.stats.ConstructBlockInfo(uint16(0))
bat, sortKey, err := blockio.BlockDataReadBackup(ctx, &blk, ds, nil, ts, fs)
if err != nil {
Expand Down

0 comments on commit eae0aed

Please sign in to comment.