diff --git a/pkg/vm/engine/disttae/logtailreplay/change_handle.go b/pkg/vm/engine/disttae/logtailreplay/change_handle.go index 896f54c05979c..a72d077b55d36 100755 --- a/pkg/vm/engine/disttae/logtailreplay/change_handle.go +++ b/pkg/vm/engine/disttae/logtailreplay/change_handle.go @@ -634,7 +634,7 @@ func (p *baseHandle) QuickNext(ctx context.Context, bat **batch.Batch, mp *mpool err = p.cnObjectHandle.QuickNext(ctx, bat, mp) return } -func (p *baseHandle) newBatchHandleWithRowIterator(ctx context.Context, iter btree.IterG[RowEntry], start, end types.TS, tombstone bool, mp *mpool.MPool) (h *BatchHandle) { +func (p *baseHandle) newBatchHandleWithRowIterator(ctx context.Context, iter btree.IterG[*RowEntry], start, end types.TS, tombstone bool, mp *mpool.MPool) (h *BatchHandle) { bat := p.getBatchesFromRowIterator(iter, start, end, tombstone, mp) if bat == nil { return nil @@ -642,12 +642,12 @@ func (p *baseHandle) newBatchHandleWithRowIterator(ctx context.Context, iter btr h = NewRowHandle(bat, mp, p, ctx) return } -func (p *baseHandle) getBatchesFromRowIterator(iter btree.IterG[RowEntry], start, end types.TS, tombstone bool, mp *mpool.MPool) (bat *batch.Batch) { +func (p *baseHandle) getBatchesFromRowIterator(iter btree.IterG[*RowEntry], start, end types.TS, tombstone bool, mp *mpool.MPool) (bat *batch.Batch) { for iter.Next() { entry := iter.Item() if checkTS(start, end, entry.Time) { if !entry.Deleted && !tombstone { - fillInInsertBatch(&bat, &entry, mp) + fillInInsertBatch(&bat, entry, mp) } if entry.Deleted && tombstone { if p.skipTS != nil { @@ -656,7 +656,7 @@ func (p *baseHandle) getBatchesFromRowIterator(iter btree.IterG[RowEntry], start continue } } - fillInDeleteBatch(&bat, &entry, mp) + fillInDeleteBatch(&bat, entry, mp) } } } diff --git a/pkg/vm/engine/disttae/logtailreplay/partition_state.go b/pkg/vm/engine/disttae/logtailreplay/partition_state.go index c5b36724e0466..19ae2f2b1f1af 100644 --- a/pkg/vm/engine/disttae/logtailreplay/partition_state.go +++ b/pkg/vm/engine/disttae/logtailreplay/partition_state.go @@ -53,7 +53,7 @@ type PartitionState struct { tid uint64 // data - rows *btree.BTreeG[RowEntry] // use value type to avoid locking on elements + rows *btree.BTreeG[*RowEntry] // use value type to avoid locking on elements checkpoints []string //current partitionState can serve snapshot read only if start <= ts <= end @@ -232,7 +232,7 @@ func (p *PartitionState) HandleDataObjectList( for i := uint32(0); i < blkCnt; i++ { blkID := objectio.NewBlockidWithObjectID(objID, uint16(i)) - pivot := RowEntry{ + pivot := &RowEntry{ // aobj has only one blk BlockID: *blkID, } @@ -369,7 +369,7 @@ func (p *PartitionState) HandleTombstoneObjectList( truncatePoint := startTSCol[idx] - var deletedRow RowEntry + var deletedRow *RowEntry for ok := tbIter.Seek(&PrimaryIndexEntry{ Bytes: objEntry.ObjectName().ObjectId()[:], @@ -384,7 +384,7 @@ func (p *PartitionState) HandleTombstoneObjectList( break } - if deletedRow, exist = p.rows.Get(RowEntry{ + if deletedRow, exist = p.rows.Get(&RowEntry{ ID: tbIter.Item().RowEntryID, BlockID: tbIter.Item().BlockID, RowID: tbIter.Item().RowID, @@ -448,7 +448,7 @@ func (p *PartitionState) HandleRowsDelete( numDeletes := int64(0) for i, rowID := range rowIDVector { blockID := rowID.CloneBlockID() - pivot := RowEntry{ + pivot := &RowEntry{ BlockID: blockID, RowID: rowID, Time: timeVector[i], @@ -538,7 +538,7 @@ func (p *PartitionState) HandleRowsInsert( var numInserted int64 for i, rowID := range rowIDVector { blockID := rowID.CloneBlockID() - pivot := RowEntry{ + pivot := &RowEntry{ BlockID: blockID, RowID: rowID, Time: timeVector[i], @@ -648,7 +648,7 @@ func NewPartitionState( service: service, tid: tid, noData: noData, - rows: btree.NewBTreeGOptions(RowEntry.Less, opts), + rows: btree.NewBTreeGOptions((*RowEntry).Less, opts), dataObjectsNameIndex: btree.NewBTreeGOptions(ObjectEntry.ObjectNameIndexLess, opts), tombstoneObjectsNameIndex: btree.NewBTreeGOptions(ObjectEntry.ObjectNameIndexLess, opts), rowPrimaryKeyIndex: btree.NewBTreeGOptions((*PrimaryIndexEntry).Less, opts), @@ -789,7 +789,7 @@ func (p *PartitionState) PKExistInMemBetween( keys [][]byte, ) (bool, bool) { iter := p.rowPrimaryKeyIndex.Copy().Iter() - pivot := RowEntry{ + pivot := &RowEntry{ Time: types.BuildTS(math.MaxInt64, math.MaxUint32), } idxEntry := &PrimaryIndexEntry{} @@ -862,7 +862,7 @@ func (p *PartitionState) RowExists(rowID types.Rowid, ts types.TS) bool { defer iter.Release() blockID := rowID.CloneBlockID() - for ok := iter.Seek(RowEntry{ + for ok := iter.Seek(&RowEntry{ BlockID: blockID, RowID: rowID, Time: ts, @@ -911,7 +911,7 @@ func (p *PartitionState) IsEmpty() bool { func (p *PartitionState) LogAllRowEntry() string { var buf bytes.Buffer - _ = p.ScanRows(false, func(entry RowEntry) (bool, error) { + _ = p.ScanRows(false, func(entry *RowEntry) (bool, error) { buf.WriteString(entry.String()) buf.WriteString("\n") return true, nil @@ -921,19 +921,19 @@ func (p *PartitionState) LogAllRowEntry() string { func (p *PartitionState) ScanRows( reverse bool, - onItem func(entry RowEntry) (bool, error), + onItem func(entry *RowEntry) (bool, error), ) (err error) { var ok bool if !reverse { - p.rows.Scan(func(item RowEntry) bool { + p.rows.Scan(func(item *RowEntry) bool { if ok, err = onItem(item); err != nil || !ok { return false } return true }) } else { - p.rows.Reverse(func(item RowEntry) bool { + p.rows.Reverse(func(item *RowEntry) bool { if ok, err = onItem(item); err != nil || !ok { return false } @@ -948,7 +948,7 @@ func (p *PartitionState) CheckRowIdDeletedInMem(ts types.TS, rowId types.Rowid) iter := p.rows.Copy().Iter() defer iter.Release() - if !iter.Seek(RowEntry{ + if !iter.Seek(&RowEntry{ Time: ts, BlockID: rowId.CloneBlockID(), RowID: rowId, diff --git a/pkg/vm/engine/disttae/logtailreplay/partition_state_test.go b/pkg/vm/engine/disttae/logtailreplay/partition_state_test.go index d330e441be68e..be4590b0f51ec 100644 --- a/pkg/vm/engine/disttae/logtailreplay/partition_state_test.go +++ b/pkg/vm/engine/disttae/logtailreplay/partition_state_test.go @@ -133,7 +133,7 @@ func TestScanRows(t *testing.T) { state := NewPartitionState("", true, 42) for i := uint32(0); i < 10; i++ { rid := types.BuildTestRowid(rand.Int63(), rand.Int63()) - state.rows.Set(RowEntry{ + state.rows.Set(&RowEntry{ BlockID: rid.CloneBlockID(), RowID: rid, Offset: int64(i), @@ -146,12 +146,12 @@ func TestScanRows(t *testing.T) { logutil.Info(state.LogAllRowEntry()) - _ = state.ScanRows(false, func(entry RowEntry) (bool, error) { + _ = state.ScanRows(false, func(entry *RowEntry) (bool, error) { logutil.Info(entry.String()) return true, nil }) - _ = state.ScanRows(true, func(entry RowEntry) (bool, error) { + _ = state.ScanRows(true, func(entry *RowEntry) (bool, error) { logutil.Info(entry.String()) return true, nil }) diff --git a/pkg/vm/engine/disttae/logtailreplay/rows_iter.go b/pkg/vm/engine/disttae/logtailreplay/rows_iter.go index 1ad00a74d3ad5..75ddc45437308 100644 --- a/pkg/vm/engine/disttae/logtailreplay/rows_iter.go +++ b/pkg/vm/engine/disttae/logtailreplay/rows_iter.go @@ -27,12 +27,12 @@ import ( type RowsIter interface { Next() bool Close() error - Entry() RowEntry + Entry() *RowEntry } type rowsIter struct { ts types.TS - iter btree.IterG[RowEntry] + iter btree.IterG[*RowEntry] firstCalled bool lastRowID types.Rowid checkBlockID bool @@ -46,7 +46,7 @@ func (p *rowsIter) Next() bool { for { if !p.firstCalled { if p.checkBlockID { - if !p.iter.Seek(RowEntry{ + if !p.iter.Seek(&RowEntry{ BlockID: p.blockID, }) { return false @@ -88,7 +88,7 @@ func (p *rowsIter) Next() bool { } } -func (p *rowsIter) Entry() RowEntry { +func (p *rowsIter) Entry() *RowEntry { return p.iter.Item() } @@ -101,10 +101,10 @@ type primaryKeyIter struct { ts types.TS spec PrimaryKeyMatchSpec iter btree.IterG[*PrimaryIndexEntry] - rows *btree.BTreeG[RowEntry] + rows *btree.BTreeG[*RowEntry] primaryIndex *btree.BTreeG[*PrimaryIndexEntry] tombstoneRowIdIdx *btree.BTreeG[*PrimaryIndexEntry] - curRow RowEntry + curRow *RowEntry specHint struct { isDelIter bool @@ -539,7 +539,7 @@ func (p *primaryKeyIter) isPKItemValid(pkItem PrimaryIndexEntry) bool { iter := p.rows.Iter() defer iter.Release() - var pivot = RowEntry{ + var pivot = &RowEntry{ Time: p.ts, BlockID: pkItem.BlockID, RowID: pkItem.RowID, @@ -588,7 +588,7 @@ func (p *primaryKeyIter) Next() bool { } } -func (p *primaryKeyIter) Entry() RowEntry { +func (p *primaryKeyIter) Entry() *RowEntry { return p.curRow } diff --git a/pkg/vm/engine/disttae/logtailreplay/types.go b/pkg/vm/engine/disttae/logtailreplay/types.go index b37b071893432..9465c0230a5d8 100644 --- a/pkg/vm/engine/disttae/logtailreplay/types.go +++ b/pkg/vm/engine/disttae/logtailreplay/types.go @@ -126,7 +126,7 @@ func (r RowEntry) String() string { r.PrimaryIndexBytes) } -func (r RowEntry) Less(than RowEntry) bool { +func (r *RowEntry) Less(than *RowEntry) bool { // asc if cmp := r.BlockID.Compare(&than.BlockID); cmp != 0 { return cmp < 0 diff --git a/pkg/vm/engine/disttae/mo_table_stats.go b/pkg/vm/engine/disttae/mo_table_stats.go index c4cef47b8fec6..2b23301932f2b 100644 --- a/pkg/vm/engine/disttae/mo_table_stats.go +++ b/pkg/vm/engine/disttae/mo_table_stats.go @@ -2405,8 +2405,8 @@ func applyTombstones( // // here, we only collect the deletes that have not paired inserts // according to its LESS function, the deletes come first - var lastInsert logtailreplay.RowEntry - err = pState.ScanRows(true, func(entry logtailreplay.RowEntry) (bool, error) { + var lastInsert *logtailreplay.RowEntry + err = pState.ScanRows(true, func(entry *logtailreplay.RowEntry) (bool, error) { if !entry.Deleted { lastInsert = entry return true, nil