Skip to content

Commit

Permalink
[CherryPick]Fix bug for 21179 (#21191)
Browse files Browse the repository at this point in the history
1.Fix bug

Approved by: @XuPeng-SH, @sukki37
  • Loading branch information
triump2020 authored Jan 14, 2025
1 parent 81ac6e6 commit 88803ef
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 22 deletions.
82 changes: 69 additions & 13 deletions pkg/vm/engine/disttae/local_disttae_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"slices"
"sort"

"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
Expand All @@ -37,7 +39,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
"go.uber.org/zap"
)

func NewLocalDataSource(
Expand Down Expand Up @@ -117,8 +118,12 @@ type LocalDisttaeDataSource struct {
insIter logtailreplay.RowsIter
}

table *txnTable
table *txnTable

wsCursor int
cachedBat *batch.Batch
sels []int64

txnOffset int

// runtime config
Expand Down Expand Up @@ -411,12 +416,10 @@ func checkWorkspaceEntryType(
entry.bat.Attrs[0] == catalog.BlockMeta_MetaLoc {
return false
}
if left, exist := tbl.getTxn().batchSelectList[entry.bat]; exist && len(left) == 0 {
if deleted, exist := tbl.getTxn().batchSelectList[entry.bat]; exist &&
len(deleted) == entry.bat.RowCount() {
// all rows have deleted in this bat
return false
} else if len(left) > 0 {
// FIXME: if len(left) > 0, we need to exclude the deleted rows in this batch
logutil.Fatal("FIXME: implement later")
}
return true
}
Expand Down Expand Up @@ -444,7 +447,6 @@ func (ls *LocalDisttaeDataSource) filterInMemUnCommittedInserts(

rows := 0
writes := ls.table.getTxn().writes
maxRows := objectio.BlockMaxRows
if len(writes) == 0 {
return nil
}
Expand All @@ -464,14 +466,57 @@ func (ls *LocalDisttaeDataSource) filterInMemUnCommittedInserts(
pkSeqNums++
}

for ; ls.wsCursor < ls.txnOffset; ls.wsCursor++ {
if writes[ls.wsCursor].bat == nil {
for {

for {
if ls.cachedBat == nil {
break
}

var sels []int64
if len(ls.sels) >= objectio.BlockMaxRows {
sels = ls.sels[:objectio.BlockMaxRows]
ls.sels = ls.sels[objectio.BlockMaxRows:]
} else {
sels = ls.sels
}

for i, destVec := range outBatch.Vecs {
colIdx := int(seqNums[i])
if colIdx != objectio.SEQNUM_ROWID {
colIdx++
} else {
colIdx = 0
}
if err := destVec.Union(ls.cachedBat.Vecs[colIdx], sels, mp); err != nil {
return err
}
}

if len(sels) == objectio.BlockMaxRows {
outBatch.SetRowCount(outBatch.Vecs[0].Length())
return nil
}

ls.cachedBat = nil
rows += len(sels)
ls.wsCursor++

}

if ls.wsCursor >= ls.txnOffset {
break
}

if writes[ls.wsCursor].bat == nil || writes[ls.wsCursor].bat.RowCount() == 0 {
ls.wsCursor++
continue
}

entry := writes[ls.wsCursor]

if ok := checkWorkspaceEntryType(ls.table, entry, true); !ok {
ls.wsCursor++
continue
}

Expand All @@ -491,9 +536,10 @@ func (ls *LocalDisttaeDataSource) filterInMemUnCommittedInserts(
skipMask.Release()

if len(offsets) == 0 {
ls.wsCursor++
continue
}

//row ids in retainedRowIds come from the same block, pls ref to writeBatch().
b := retainedRowIds[0].BorrowBlockID()
sels, err := ls.ApplyTombstones(
ls.ctx, b, offsets, engine.Policy_CheckUnCommittedOnly)
Expand All @@ -502,13 +548,15 @@ func (ls *LocalDisttaeDataSource) filterInMemUnCommittedInserts(
}

if len(sels) == 0 {
ls.wsCursor++
continue
}

if rows+len(sels) > maxRows {
break
if rows+len(sels) >= objectio.BlockMaxRows {
ls.cachedBat = entry.bat
ls.sels = sels[objectio.BlockMaxRows-rows:]
sels = sels[:objectio.BlockMaxRows-rows]
}
rows += len(sels)

for i, destVec := range outBatch.Vecs {
colIdx := int(seqNums[i])
Expand All @@ -521,6 +569,14 @@ func (ls *LocalDisttaeDataSource) filterInMemUnCommittedInserts(
return err
}
}

if rows+len(sels) == objectio.BlockMaxRows {
break
}

rows += len(sels)
ls.wsCursor++

}

outBatch.SetRowCount(outBatch.Vecs[0].Length())
Expand Down
11 changes: 5 additions & 6 deletions pkg/vm/engine/disttae/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,14 +1040,13 @@ func (txn *Transaction) deleteTableWrites(
continue
}
for k, v := range vs {
if _, ok := mp[v]; !ok {
// if the v is not to be deleted, then add its index into the sels.
if _, ok := mp[v]; ok {
// if the v will be deleted, then add its index into the sels.
sels = append(sels, int64(k))
} else {
mp[v]++
}
}
if len(sels) != len(vs) {
if len(sels) > 0 {
txn.batchSelectList[entry.bat] = append(txn.batchSelectList[entry.bat], sels...)
}
}
Expand Down Expand Up @@ -1081,8 +1080,8 @@ func (txn *Transaction) mergeTxnWorkspaceLocked(ctx context.Context) error {
if len(txn.batchSelectList) > 0 {
for _, e := range txn.writes {
if sels, ok := txn.batchSelectList[e.bat]; ok {
txn.approximateInMemInsertCnt -= e.bat.RowCount() - len(sels)
e.bat.Shrink(sels, false)
txn.approximateInMemInsertCnt -= len(sels)
e.bat.Shrink(sels, true)
delete(txn.batchSelectList, e.bat)
}
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1378,13 +1378,12 @@ func (tbl *txnTable) AlterTable(ctx context.Context, c *engine.ConstraintDef, re
if err := tbl.db.createWithID(ctx, tbl.tableName, tbl.tableId, tbl.defs, !createdInTxn); err != nil {
return err
}

if createdInTxn {
// 3. adjust writes for the table
txn.Lock()
for i, n := 0, len(txn.writes); i < n; i++ {
if cur := txn.writes[i]; cur.tableId == tbl.tableId && cur.bat != nil && cur.bat.RowCount() > 0 {
if sels, exist := txn.batchSelectList[cur.bat]; exist && len(sels) == 0 {
if sels, exist := txn.batchSelectList[cur.bat]; exist && len(sels) == cur.bat.RowCount() {
continue
}
txn.writes = append(txn.writes, txn.writes[i]) // copy by value
Expand All @@ -1394,7 +1393,10 @@ func (tbl *txnTable) AlterTable(ctx context.Context, c *engine.ConstraintDef, re
if err != nil {
return err
}
txn.batchSelectList[cur.bat] = []int64{}
for j := 0; j < cur.bat.RowCount(); j++ {
txn.batchSelectList[cur.bat] = append(txn.batchSelectList[cur.bat], int64(j))
}

}
}
txn.Unlock()
Expand Down

0 comments on commit 88803ef

Please sign in to comment.