diff --git a/pkg/vm/engine/disttae/logtailreplay/change_handle.go b/pkg/vm/engine/disttae/logtailreplay/change_handle.go index bdb7d4421fb94..6e61cce04221c 100755 --- a/pkg/vm/engine/disttae/logtailreplay/change_handle.go +++ b/pkg/vm/engine/disttae/logtailreplay/change_handle.go @@ -634,7 +634,7 @@ func (p *baseHandle) QuickNext(ctx context.Context, bat **batch.Batch, mp *mpool err = p.cnObjectHandle.QuickNext(ctx, bat, mp) return } -func (p *baseHandle) newBatchHandleWithRowIterator(ctx context.Context, iter btree.IterG[RowEntry], start, end types.TS, tombstone bool, mp *mpool.MPool) (h *BatchHandle) { +func (p *baseHandle) newBatchHandleWithRowIterator(ctx context.Context, iter btree.IterG[*RowEntry], start, end types.TS, tombstone bool, mp *mpool.MPool) (h *BatchHandle) { bat := p.getBatchesFromRowIterator(iter, start, end, tombstone, mp) if bat == nil { return nil @@ -642,12 +642,12 @@ func (p *baseHandle) newBatchHandleWithRowIterator(ctx context.Context, iter btr h = NewRowHandle(bat, mp, p, ctx) return } -func (p *baseHandle) getBatchesFromRowIterator(iter btree.IterG[RowEntry], start, end types.TS, tombstone bool, mp *mpool.MPool) (bat *batch.Batch) { +func (p *baseHandle) getBatchesFromRowIterator(iter btree.IterG[*RowEntry], start, end types.TS, tombstone bool, mp *mpool.MPool) (bat *batch.Batch) { for iter.Next() { entry := iter.Item() if checkTS(start, end, entry.Time) { if !entry.Deleted && !tombstone { - fillInInsertBatch(&bat, &entry, mp) + fillInInsertBatch(&bat, entry, mp) } if entry.Deleted && tombstone { if p.skipTS != nil { @@ -656,7 +656,7 @@ func (p *baseHandle) getBatchesFromRowIterator(iter btree.IterG[RowEntry], start continue } } - fillInDeleteBatch(&bat, &entry, mp) + fillInDeleteBatch(&bat, entry, mp) } } } diff --git a/pkg/vm/engine/disttae/logtailreplay/partition_state.go b/pkg/vm/engine/disttae/logtailreplay/partition_state.go index 0d3ebc6edd5e0..ae90cb9fc656e 100644 --- a/pkg/vm/engine/disttae/logtailreplay/partition_state.go +++ b/pkg/vm/engine/disttae/logtailreplay/partition_state.go @@ -54,7 +54,7 @@ type PartitionState struct { tid uint64 // data - rows *btree.BTreeG[RowEntry] // use value type to avoid locking on elements + rows *btree.BTreeG[*RowEntry] // use value type to avoid locking on elements checkpoints []string //current partitionState can serve snapshot read only if start <= ts <= end @@ -233,7 +233,7 @@ func (p *PartitionState) HandleDataObjectList( for i := uint32(0); i < blkCnt; i++ { blkID := objectio.NewBlockidWithObjectID(objID, uint16(i)) - pivot := RowEntry{ + pivot := &RowEntry{ // aobj has only one blk BlockID: *blkID, } @@ -370,7 +370,7 @@ func (p *PartitionState) HandleTombstoneObjectList( truncatePoint := startTSCol[idx] - var deletedRow RowEntry + var deletedRow *RowEntry for ok := tbIter.Seek(&PrimaryIndexEntry{ Bytes: objEntry.ObjectName().ObjectId()[:], @@ -385,7 +385,7 @@ func (p *PartitionState) HandleTombstoneObjectList( break } - if deletedRow, exist = p.rows.Get(RowEntry{ + if deletedRow, exist = p.rows.Get(&RowEntry{ ID: tbIter.Item().RowEntryID, BlockID: tbIter.Item().BlockID, RowID: tbIter.Item().RowID, @@ -449,7 +449,7 @@ func (p *PartitionState) HandleRowsDelete( numDeletes := int64(0) for i, rowID := range rowIDVector { blockID := rowID.CloneBlockID() - pivot := RowEntry{ + pivot := &RowEntry{ BlockID: blockID, RowID: rowID, Time: timeVector[i], @@ -539,7 +539,7 @@ func (p *PartitionState) HandleRowsInsert( var numInserted int64 for i, rowID := range rowIDVector { blockID := rowID.CloneBlockID() - pivot := RowEntry{ + pivot := &RowEntry{ BlockID: blockID, RowID: rowID, Time: timeVector[i], @@ -648,7 +648,7 @@ func NewPartitionState( service: service, tid: tid, noData: noData, - rows: btree.NewBTreeGOptions(RowEntry.Less, opts), + rows: btree.NewBTreeGOptions((*RowEntry).Less, opts), dataObjectsNameIndex: btree.NewBTreeGOptions(objectio.ObjectEntry.ObjectNameIndexLess, opts), tombstoneObjectsNameIndex: btree.NewBTreeGOptions(objectio.ObjectEntry.ObjectNameIndexLess, opts), rowPrimaryKeyIndex: btree.NewBTreeGOptions((*PrimaryIndexEntry).Less, opts), @@ -796,7 +796,7 @@ func (p *PartitionState) PKExistInMemBetween( keys [][]byte, ) (bool, bool) { iter := p.rowPrimaryKeyIndex.Iter() - pivot := RowEntry{ + pivot := &RowEntry{ Time: types.BuildTS(math.MaxInt64, math.MaxUint32), } idxEntry := &PrimaryIndexEntry{} @@ -869,7 +869,7 @@ func (p *PartitionState) RowExists(rowID types.Rowid, ts types.TS) bool { defer iter.Release() blockID := rowID.CloneBlockID() - for ok := iter.Seek(RowEntry{ + for ok := iter.Seek(&RowEntry{ BlockID: blockID, RowID: rowID, Time: ts, @@ -918,7 +918,7 @@ func (p *PartitionState) IsEmpty() bool { func (p *PartitionState) LogAllRowEntry() string { var buf bytes.Buffer - _ = p.ScanRows(false, func(entry RowEntry) (bool, error) { + _ = p.ScanRows(false, func(entry *RowEntry) (bool, error) { buf.WriteString(entry.String()) buf.WriteString("\n") return true, nil @@ -928,19 +928,19 @@ func (p *PartitionState) LogAllRowEntry() string { func (p *PartitionState) ScanRows( reverse bool, - onItem func(entry RowEntry) (bool, error), + onItem func(entry *RowEntry) (bool, error), ) (err error) { var ok bool if !reverse { - p.rows.Scan(func(item RowEntry) bool { + p.rows.Scan(func(item *RowEntry) bool { if ok, err = onItem(item); err != nil || !ok { return false } return true }) } else { - p.rows.Reverse(func(item RowEntry) bool { + p.rows.Reverse(func(item *RowEntry) bool { if ok, err = onItem(item); err != nil || !ok { return false } @@ -955,7 +955,7 @@ func (p *PartitionState) CheckRowIdDeletedInMem(ts types.TS, rowId types.Rowid) iter := p.rows.Iter() defer iter.Release() - if !iter.Seek(RowEntry{ + if !iter.Seek(&RowEntry{ Time: ts, BlockID: rowId.CloneBlockID(), RowID: rowId, diff --git a/pkg/vm/engine/disttae/logtailreplay/partition_state_test.go b/pkg/vm/engine/disttae/logtailreplay/partition_state_test.go index 5732c9461bee4..42c986130d7b9 100644 --- a/pkg/vm/engine/disttae/logtailreplay/partition_state_test.go +++ b/pkg/vm/engine/disttae/logtailreplay/partition_state_test.go @@ -132,7 +132,7 @@ func TestScanRows(t *testing.T) { state := NewPartitionState("", true, 42) for i := uint32(0); i < 10; i++ { rid := types.BuildTestRowid(rand.Int63(), rand.Int63()) - state.rows.Set(RowEntry{ + state.rows.Set(&RowEntry{ BlockID: rid.CloneBlockID(), RowID: rid, Offset: int64(i), @@ -145,12 +145,12 @@ func TestScanRows(t *testing.T) { logutil.Info(state.LogAllRowEntry()) - _ = state.ScanRows(false, func(entry RowEntry) (bool, error) { + _ = state.ScanRows(false, func(entry *RowEntry) (bool, error) { logutil.Info(entry.String()) return true, nil }) - _ = state.ScanRows(true, func(entry RowEntry) (bool, error) { + _ = state.ScanRows(true, func(entry *RowEntry) (bool, error) { logutil.Info(entry.String()) return true, nil }) diff --git a/pkg/vm/engine/disttae/logtailreplay/rows_iter.go b/pkg/vm/engine/disttae/logtailreplay/rows_iter.go index a4b9c1c7b81c7..0c13a336a77aa 100644 --- a/pkg/vm/engine/disttae/logtailreplay/rows_iter.go +++ b/pkg/vm/engine/disttae/logtailreplay/rows_iter.go @@ -29,12 +29,12 @@ import ( type RowsIter interface { Next() bool Close() error - Entry() RowEntry + Entry() *RowEntry } type rowsIter struct { ts types.TS - iter btree.IterG[RowEntry] + iter btree.IterG[*RowEntry] firstCalled bool lastRowID types.Rowid checkBlockID bool @@ -48,7 +48,7 @@ func (p *rowsIter) Next() bool { for { if !p.firstCalled { if p.checkBlockID { - if !p.iter.Seek(RowEntry{ + if !p.iter.Seek(&RowEntry{ BlockID: p.blockID, }) { return false @@ -90,7 +90,7 @@ func (p *rowsIter) Next() bool { } } -func (p *rowsIter) Entry() RowEntry { +func (p *rowsIter) Entry() *RowEntry { return p.iter.Item() } @@ -103,10 +103,10 @@ type primaryKeyIter struct { ts types.TS spec PrimaryKeyMatchSpec iter btree.IterG[*PrimaryIndexEntry] - rows *btree.BTreeG[RowEntry] + rows *btree.BTreeG[*RowEntry] primaryIndex *btree.BTreeG[*PrimaryIndexEntry] tombstoneRowIdIdx *btree.BTreeG[*PrimaryIndexEntry] - curRow RowEntry + curRow *RowEntry specHint struct { isDelIter bool @@ -507,7 +507,7 @@ func (p *primaryKeyIter) isPKItemValid(pkItem PrimaryIndexEntry) bool { iter := p.rows.Iter() defer iter.Release() - var pivot = RowEntry{ + var pivot = &RowEntry{ Time: p.ts, BlockID: pkItem.BlockID, RowID: pkItem.RowID, @@ -556,7 +556,7 @@ func (p *primaryKeyIter) Next() bool { } } -func (p *primaryKeyIter) Entry() RowEntry { +func (p *primaryKeyIter) Entry() *RowEntry { return p.curRow } diff --git a/pkg/vm/engine/disttae/logtailreplay/types.go b/pkg/vm/engine/disttae/logtailreplay/types.go index 9e127bac88682..a67e9300688a8 100644 --- a/pkg/vm/engine/disttae/logtailreplay/types.go +++ b/pkg/vm/engine/disttae/logtailreplay/types.go @@ -54,7 +54,7 @@ func (r RowEntry) String() string { r.PrimaryIndexBytes) } -func (r RowEntry) Less(than RowEntry) bool { +func (r *RowEntry) Less(than *RowEntry) bool { // asc if cmp := r.BlockID.Compare(&than.BlockID); cmp != 0 { return cmp < 0 diff --git a/pkg/vm/engine/disttae/mo_table_stats.go b/pkg/vm/engine/disttae/mo_table_stats.go index ca57c5d0d7ce5..4509bd625c115 100644 --- a/pkg/vm/engine/disttae/mo_table_stats.go +++ b/pkg/vm/engine/disttae/mo_table_stats.go @@ -3036,8 +3036,8 @@ func applyTombstones( // // here, we only collect the deletes that have not paired inserts // according to its LESS function, the deletes come first - var lastInsert logtailreplay.RowEntry - err = pState.ScanRows(true, func(entry logtailreplay.RowEntry) (bool, error) { + var lastInsert *logtailreplay.RowEntry + err = pState.ScanRows(true, func(entry *logtailreplay.RowEntry) (bool, error) { if !entry.Deleted { lastInsert = entry return true, nil