Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce the memory consumption by storing points in the partition state. #21244

Merged
merged 4 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/vm/engine/disttae/logtailreplay/change_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,20 +634,20 @@ func (p *baseHandle) QuickNext(ctx context.Context, bat **batch.Batch, mp *mpool
err = p.cnObjectHandle.QuickNext(ctx, bat, mp)
return
}
func (p *baseHandle) newBatchHandleWithRowIterator(ctx context.Context, iter btree.IterG[RowEntry], start, end types.TS, tombstone bool, mp *mpool.MPool) (h *BatchHandle) {
func (p *baseHandle) newBatchHandleWithRowIterator(ctx context.Context, iter btree.IterG[*RowEntry], start, end types.TS, tombstone bool, mp *mpool.MPool) (h *BatchHandle) {
bat := p.getBatchesFromRowIterator(iter, start, end, tombstone, mp)
if bat == nil {
return nil
}
h = NewRowHandle(bat, mp, p, ctx)
return
}
func (p *baseHandle) getBatchesFromRowIterator(iter btree.IterG[RowEntry], start, end types.TS, tombstone bool, mp *mpool.MPool) (bat *batch.Batch) {
func (p *baseHandle) getBatchesFromRowIterator(iter btree.IterG[*RowEntry], start, end types.TS, tombstone bool, mp *mpool.MPool) (bat *batch.Batch) {
for iter.Next() {
entry := iter.Item()
if checkTS(start, end, entry.Time) {
if !entry.Deleted && !tombstone {
fillInInsertBatch(&bat, &entry, mp)
fillInInsertBatch(&bat, entry, mp)
}
if entry.Deleted && tombstone {
if p.skipTS != nil {
Expand All @@ -656,7 +656,7 @@ func (p *baseHandle) getBatchesFromRowIterator(iter btree.IterG[RowEntry], start
continue
}
}
fillInDeleteBatch(&bat, &entry, mp)
fillInDeleteBatch(&bat, entry, mp)
}
}
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/vm/engine/disttae/logtailreplay/partition_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type PartitionState struct {
tid uint64

// data
rows *btree.BTreeG[RowEntry] // use value type to avoid locking on elements
rows *btree.BTreeG[*RowEntry] // use value type to avoid locking on elements

checkpoints []string
//current partitionState can serve snapshot read only if start <= ts <= end
Expand Down Expand Up @@ -233,7 +233,7 @@ func (p *PartitionState) HandleDataObjectList(
for i := uint32(0); i < blkCnt; i++ {

blkID := objectio.NewBlockidWithObjectID(objID, uint16(i))
pivot := RowEntry{
pivot := &RowEntry{
// aobj has only one blk
BlockID: *blkID,
}
Expand Down Expand Up @@ -370,7 +370,7 @@ func (p *PartitionState) HandleTombstoneObjectList(

truncatePoint := startTSCol[idx]

var deletedRow RowEntry
var deletedRow *RowEntry

for ok := tbIter.Seek(&PrimaryIndexEntry{
Bytes: objEntry.ObjectName().ObjectId()[:],
Expand All @@ -385,7 +385,7 @@ func (p *PartitionState) HandleTombstoneObjectList(
break
}

if deletedRow, exist = p.rows.Get(RowEntry{
if deletedRow, exist = p.rows.Get(&RowEntry{
ID: tbIter.Item().RowEntryID,
BlockID: tbIter.Item().BlockID,
RowID: tbIter.Item().RowID,
Expand Down Expand Up @@ -449,7 +449,7 @@ func (p *PartitionState) HandleRowsDelete(
numDeletes := int64(0)
for i, rowID := range rowIDVector {
blockID := rowID.CloneBlockID()
pivot := RowEntry{
pivot := &RowEntry{
BlockID: blockID,
RowID: rowID,
Time: timeVector[i],
Expand Down Expand Up @@ -539,7 +539,7 @@ func (p *PartitionState) HandleRowsInsert(
var numInserted int64
for i, rowID := range rowIDVector {
blockID := rowID.CloneBlockID()
pivot := RowEntry{
pivot := &RowEntry{
BlockID: blockID,
RowID: rowID,
Time: timeVector[i],
Expand Down Expand Up @@ -648,7 +648,7 @@ func NewPartitionState(
service: service,
tid: tid,
noData: noData,
rows: btree.NewBTreeGOptions(RowEntry.Less, opts),
rows: btree.NewBTreeGOptions((*RowEntry).Less, opts),
dataObjectsNameIndex: btree.NewBTreeGOptions(objectio.ObjectEntry.ObjectNameIndexLess, opts),
tombstoneObjectsNameIndex: btree.NewBTreeGOptions(objectio.ObjectEntry.ObjectNameIndexLess, opts),
rowPrimaryKeyIndex: btree.NewBTreeGOptions((*PrimaryIndexEntry).Less, opts),
Expand Down Expand Up @@ -796,7 +796,7 @@ func (p *PartitionState) PKExistInMemBetween(
keys [][]byte,
) (bool, bool) {
iter := p.rowPrimaryKeyIndex.Iter()
pivot := RowEntry{
pivot := &RowEntry{
Time: types.BuildTS(math.MaxInt64, math.MaxUint32),
}
idxEntry := &PrimaryIndexEntry{}
Expand Down Expand Up @@ -869,7 +869,7 @@ func (p *PartitionState) RowExists(rowID types.Rowid, ts types.TS) bool {
defer iter.Release()

blockID := rowID.CloneBlockID()
for ok := iter.Seek(RowEntry{
for ok := iter.Seek(&RowEntry{
BlockID: blockID,
RowID: rowID,
Time: ts,
Expand Down Expand Up @@ -918,7 +918,7 @@ func (p *PartitionState) IsEmpty() bool {

func (p *PartitionState) LogAllRowEntry() string {
var buf bytes.Buffer
_ = p.ScanRows(false, func(entry RowEntry) (bool, error) {
_ = p.ScanRows(false, func(entry *RowEntry) (bool, error) {
buf.WriteString(entry.String())
buf.WriteString("\n")
return true, nil
Expand All @@ -928,19 +928,19 @@ func (p *PartitionState) LogAllRowEntry() string {

func (p *PartitionState) ScanRows(
reverse bool,
onItem func(entry RowEntry) (bool, error),
onItem func(entry *RowEntry) (bool, error),
) (err error) {
var ok bool

if !reverse {
p.rows.Scan(func(item RowEntry) bool {
p.rows.Scan(func(item *RowEntry) bool {
if ok, err = onItem(item); err != nil || !ok {
return false
}
return true
})
} else {
p.rows.Reverse(func(item RowEntry) bool {
p.rows.Reverse(func(item *RowEntry) bool {
if ok, err = onItem(item); err != nil || !ok {
return false
}
Expand All @@ -955,7 +955,7 @@ func (p *PartitionState) CheckRowIdDeletedInMem(ts types.TS, rowId types.Rowid)
iter := p.rows.Iter()
defer iter.Release()

if !iter.Seek(RowEntry{
if !iter.Seek(&RowEntry{
Time: ts,
BlockID: rowId.CloneBlockID(),
RowID: rowId,
Expand Down
6 changes: 3 additions & 3 deletions pkg/vm/engine/disttae/logtailreplay/partition_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestScanRows(t *testing.T) {
state := NewPartitionState("", true, 42)
for i := uint32(0); i < 10; i++ {
rid := types.BuildTestRowid(rand.Int63(), rand.Int63())
state.rows.Set(RowEntry{
state.rows.Set(&RowEntry{
BlockID: rid.CloneBlockID(),
RowID: rid,
Offset: int64(i),
Expand All @@ -145,12 +145,12 @@ func TestScanRows(t *testing.T) {

logutil.Info(state.LogAllRowEntry())

_ = state.ScanRows(false, func(entry RowEntry) (bool, error) {
_ = state.ScanRows(false, func(entry *RowEntry) (bool, error) {
logutil.Info(entry.String())
return true, nil
})

_ = state.ScanRows(true, func(entry RowEntry) (bool, error) {
_ = state.ScanRows(true, func(entry *RowEntry) (bool, error) {
logutil.Info(entry.String())
return true, nil
})
Expand Down
16 changes: 8 additions & 8 deletions pkg/vm/engine/disttae/logtailreplay/rows_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (
type RowsIter interface {
Next() bool
Close() error
Entry() RowEntry
Entry() *RowEntry
}

type rowsIter struct {
ts types.TS
iter btree.IterG[RowEntry]
iter btree.IterG[*RowEntry]
firstCalled bool
lastRowID types.Rowid
checkBlockID bool
Expand All @@ -48,7 +48,7 @@ func (p *rowsIter) Next() bool {
for {
if !p.firstCalled {
if p.checkBlockID {
if !p.iter.Seek(RowEntry{
if !p.iter.Seek(&RowEntry{
BlockID: p.blockID,
}) {
return false
Expand Down Expand Up @@ -90,7 +90,7 @@ func (p *rowsIter) Next() bool {
}
}

func (p *rowsIter) Entry() RowEntry {
func (p *rowsIter) Entry() *RowEntry {
return p.iter.Item()
}

Expand All @@ -103,10 +103,10 @@ type primaryKeyIter struct {
ts types.TS
spec PrimaryKeyMatchSpec
iter btree.IterG[*PrimaryIndexEntry]
rows *btree.BTreeG[RowEntry]
rows *btree.BTreeG[*RowEntry]
primaryIndex *btree.BTreeG[*PrimaryIndexEntry]
tombstoneRowIdIdx *btree.BTreeG[*PrimaryIndexEntry]
curRow RowEntry
curRow *RowEntry

specHint struct {
isDelIter bool
Expand Down Expand Up @@ -507,7 +507,7 @@ func (p *primaryKeyIter) isPKItemValid(pkItem PrimaryIndexEntry) bool {
iter := p.rows.Iter()
defer iter.Release()

var pivot = RowEntry{
var pivot = &RowEntry{
Time: p.ts,
BlockID: pkItem.BlockID,
RowID: pkItem.RowID,
Expand Down Expand Up @@ -556,7 +556,7 @@ func (p *primaryKeyIter) Next() bool {
}
}

func (p *primaryKeyIter) Entry() RowEntry {
func (p *primaryKeyIter) Entry() *RowEntry {
return p.curRow
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/logtailreplay/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (r RowEntry) String() string {
r.PrimaryIndexBytes)
}

func (r RowEntry) Less(than RowEntry) bool {
func (r *RowEntry) Less(than *RowEntry) bool {
// asc
if cmp := r.BlockID.Compare(&than.BlockID); cmp != 0 {
return cmp < 0
Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/disttae/mo_table_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3036,8 +3036,8 @@ func applyTombstones(
//
// here, we only collect the deletes that have not paired inserts
// according to its LESS function, the deletes come first
var lastInsert logtailreplay.RowEntry
err = pState.ScanRows(true, func(entry logtailreplay.RowEntry) (bool, error) {
var lastInsert *logtailreplay.RowEntry
err = pState.ScanRows(true, func(entry *logtailreplay.RowEntry) (bool, error) {
if !entry.Deleted {
lastInsert = entry
return true, nil
Expand Down
Loading