Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt] optimize point queries #20409

Open
wants to merge 49 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
d6c79a9
[opt] optimize point queries
aunjgr Nov 27, 2024
8f6df77
Merge branch 'main' into opt_point
aunjgr Nov 28, 2024
c64e1e3
optimize DML point query
aunjgr Nov 28, 2024
a0933c7
Merge branch 'main' into opt_point
aunjgr Nov 28, 2024
469e17d
Merge branch 'main' into opt_point
aunjgr Nov 28, 2024
4c2744a
Merge branch 'main' into opt_point
aunjgr Nov 28, 2024
e8574ea
use off-heap memory
aunjgr Nov 28, 2024
eb511e0
code coverage
aunjgr Nov 28, 2024
0ae9cab
Merge branch 'main' into opt_point
aunjgr Nov 28, 2024
e00e451
Merge branch 'main' into opt_point
aunjgr Nov 28, 2024
56d4ca6
code coverage
aunjgr Nov 28, 2024
5000f25
fix a panic in multi-CN DEDUP join
aunjgr Nov 29, 2024
d9f9ff1
add ut
aunjgr Nov 29, 2024
0a4208b
Merge branch 'main' into fix_panic
aunjgr Nov 29, 2024
049600c
fix
aunjgr Nov 29, 2024
57e2eda
Merge branch 'fix_panic' into opt_point
aunjgr Nov 29, 2024
0332d98
fix ut
aunjgr Nov 29, 2024
3e70343
fix
aunjgr Nov 29, 2024
56fb40f
Merge branch 'fix_panic' into opt_point
aunjgr Nov 29, 2024
dbb61a8
Merge branch 'main' into opt_point
aunjgr Nov 29, 2024
5f6b0a6
Merge branch 'main' into opt_point
aunjgr Nov 29, 2024
0789b6e
Merge branch 'main' into opt_point
aunjgr Nov 29, 2024
880b0b3
don't use off-heap memory
aunjgr Nov 29, 2024
6db7802
Merge branch 'main' into opt_point
aunjgr Nov 29, 2024
d385dbf
fix
aunjgr Nov 29, 2024
49a2229
Merge branch 'main' into opt_point
aunjgr Nov 29, 2024
4b77475
Merge branch 'main' into opt_point
aunjgr Nov 30, 2024
f255555
Merge branch 'main' into opt_point
aunjgr Dec 2, 2024
1d1d621
fix a hung
aunjgr Dec 3, 2024
cd7a8dc
Merge branch 'main' into opt_point
aunjgr Dec 3, 2024
2d476eb
fix memory reuse
aunjgr Dec 3, 2024
b1f2d5b
Merge branch 'main' into opt_point
aunjgr Dec 3, 2024
359163b
Merge branch 'main' into opt_point
aunjgr Dec 4, 2024
dd960bb
temp fix
aunjgr Dec 4, 2024
87350b4
fix runtime filter bug
aunjgr Dec 4, 2024
03010fc
fix sca
aunjgr Dec 4, 2024
66a7559
Merge branch 'main' into opt_point
aunjgr Dec 6, 2024
ab27fd5
Merge branch 'main' into opt_point
aunjgr Dec 7, 2024
4d05087
delete temp fix
aunjgr Dec 7, 2024
17e450e
Merge branch 'main' into opt_point
aunjgr Dec 8, 2024
1b65a51
Merge branch 'main' into opt_point
aunjgr Dec 9, 2024
5c367a6
Merge branch 'main' into opt_point
aunjgr Dec 9, 2024
d7f4490
Merge branch 'main' into opt_point
aunjgr Dec 10, 2024
7d601b1
Merge branch 'main' into opt_point
aunjgr Dec 10, 2024
68c244a
Merge branch 'main' into opt_point
aunjgr Dec 10, 2024
3de8af4
Merge branch 'main' into opt_point
aunjgr Dec 11, 2024
9067b55
Merge branch 'main' into opt_point
aunjgr Dec 11, 2024
a2649d0
Merge branch 'main' into opt_point
aunjgr Dec 16, 2024
325b7fd
Merge branch 'main' into opt_point
aunjgr Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/container/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ import (
"bytes"
"context"
"fmt"
"github.com/matrixorigin/matrixone/pkg/common/bitmap"

"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"

"github.com/matrixorigin/matrixone/pkg/common/bitmap"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
)

func New(attrs []string) *Batch {
Expand Down
277 changes: 208 additions & 69 deletions pkg/sql/colexec/dedupjoin/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (dedupJoin *DedupJoin) build(analyzer process.Analyzer, proc *process.Proce
if dedupJoin.OnDuplicateAction != plan.Node_UPDATE {
ctr.matched.InitWithSize(ctr.batchRowCount)
} else {
ctr.matched.InitWithSize(int64(ctr.mp.GetGroupCount()) + 1)
ctr.matched.InitWithSize(int64(ctr.mp.GetGroupCount()))
}
}
return
Expand Down Expand Up @@ -194,10 +194,27 @@ func (ctr *container) finalize(ap *DedupJoin, proc *process.Process) error {
}
}

if ap.OnDuplicateAction != plan.Node_UPDATE {
if ap.OnDuplicateAction != plan.Node_UPDATE || ctr.mp.HashOnUnique() {
if ctr.matched.Count() == 0 {
ap.ctr.buf = ctr.batches
ctr.batches = nil
//ap.ctr.buf = ctr.batches
ap.ctr.buf = make([]*batch.Batch, len(ctr.batches))
for i := range ap.ctr.buf {
ap.ctr.buf[i] = batch.NewWithSize(len(ap.Result))
batSize := ctr.batches[i].Vecs[0].Length()
for j, rp := range ap.Result {
if rp.Rel == 1 {
ap.ctr.buf[i].SetVector(int32(j), ctr.batches[i].Vecs[rp.Pos])
ctr.batches[i].Vecs[rp.Pos] = nil
} else {
ap.ctr.buf[i].Vecs[j] = vector.NewVec(ap.LeftTypes[rp.Pos])
if err := vector.AppendMultiFixed(ap.ctr.buf[i].Vecs[j], 0, true, batSize, proc.Mp()); err != nil {
return err
}
}
}

ap.ctr.buf[i].SetRowCount(batSize)
}

return nil
}
Expand Down Expand Up @@ -255,45 +272,45 @@ func (ctr *container) finalize(ap *DedupJoin, proc *process.Process) error {
batCnt := (count-1)/colexec.DefaultBatchSize + 1
ap.ctr.buf = make([]*batch.Batch, batCnt)

fillCnt := 0
batIdx, rowIdx := 0, 0
for fillCnt < len(sels) {
batSize := colexec.DefaultBatchSize
if fillCnt+batSize > len(sels) {
batSize = len(sels) - fillCnt
}

ap.ctr.buf[batIdx] = batch.NewWithSize(len(ap.Result))
for i, rp := range ap.Result {
if rp.Rel == 1 {
ap.ctr.buf[batIdx].Vecs[i] = vector.NewVec(ap.RightTypes[rp.Pos])
for _, sel := range sels[fillCnt : fillCnt+batSize] {
idx1, idx2 := sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize
if err := ap.ctr.buf[batIdx].Vecs[i].UnionOne(ctr.batches[idx1].Vecs[rp.Pos], int64(idx2), proc.Mp()); err != nil {
return err
}
}
} else {
ap.ctr.buf[batIdx].Vecs[i] = vector.NewVec(ap.LeftTypes[rp.Pos])
if err := vector.AppendMultiFixed(ap.ctr.buf[batIdx].Vecs[i], 0, true, batSize, proc.Mp()); err != nil {
return err
}
}
}

ap.ctr.buf[batIdx].SetRowCount(batSize)
fillCnt += batSize
batIdx++
rowIdx = batSize % colexec.DefaultBatchSize
}
//fillCnt := 0
//for fillCnt < len(sels) {
// batSize := colexec.DefaultBatchSize
// if fillCnt+batSize > len(sels) {
// batSize = len(sels) - fillCnt
// }
//
// ap.ctr.buf[batIdx] = batch.NewWithSize(len(ap.Result))
// for i, rp := range ap.Result {
// if rp.Rel == 1 {
// ap.ctr.buf[batIdx].Vecs[i] = vector.NewVec(ap.RightTypes[rp.Pos])
// for _, sel := range sels[fillCnt : fillCnt+batSize] {
// idx1, idx2 := sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize
// if err := ap.ctr.buf[batIdx].Vecs[i].UnionOne(ctr.batches[idx1].Vecs[rp.Pos], int64(idx2), proc.Mp()); err != nil {
// return err
// }
// }
// } else {
// ap.ctr.buf[batIdx].Vecs[i] = vector.NewVec(ap.LeftTypes[rp.Pos])
// if err := vector.AppendMultiFixed(ap.ctr.buf[batIdx].Vecs[i], 0, true, batSize, proc.Mp()); err != nil {
// return err
// }
// }
// }
//
// ap.ctr.buf[batIdx].SetRowCount(batSize)
// fillCnt += batSize
// batIdx++
// rowIdx = batSize % colexec.DefaultBatchSize
//}

if ctr.joinBat1 != nil {
ctr.joinBat1.Clean(proc.GetMPool())
}
ctr.joinBat1, ctr.cfs1 = colexec.NewJoinBatch(ctr.batches[0], proc.Mp())

bitmapLen := uint64(ctr.matched.Len())
for i := uint64(1); i < bitmapLen; i++ {
for i := uint64(0); i < bitmapLen; i++ {
if ctr.matched.Contains(i) {
continue
}
Expand All @@ -309,41 +326,59 @@ func (ctr *container) finalize(ap *DedupJoin, proc *process.Process) error {
}
}

sels = ctr.mp.GetSels(i)
sels = ctr.mp.GetSels(i + 1)
idx1, idx2 := sels[0]/colexec.DefaultBatchSize, sels[0]%colexec.DefaultBatchSize
err := colexec.SetJoinBatchValues(ctr.joinBat1, ctr.batches[idx1], int64(idx2), 1, ctr.cfs1)
if err != nil {
return err
}

for _, sel := range sels[1:] {
idx1, idx2 = sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize
err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2)
if len(sels) == 1 {
for j, rp := range ap.Result {
if rp.Rel == 1 {
if err := ap.ctr.buf[batIdx].Vecs[j].UnionOne(ctr.batches[idx1].Vecs[rp.Pos], int64(idx2), proc.Mp()); err != nil {
return err
}
} else {
if err := ap.ctr.buf[batIdx].Vecs[j].UnionNull(proc.Mp()); err != nil {
return err
}
}
}
} else {
err := colexec.SetJoinBatchValues(ctr.joinBat1, ctr.batches[idx1], int64(idx2), 1, ctr.cfs1)
if err != nil {
return err
}

vecs := make([]*vector.Vector, len(ctr.exprExecs))
for j, exprExec := range ctr.exprExecs {
vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil)
if ctr.joinBat2 == nil {
ctr.joinBat2, ctr.cfs2 = colexec.NewJoinBatch(ctr.batches[0], proc.Mp())
}

for _, sel := range sels[1:] {
idx1, idx2 = sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize
err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2)
if err != nil {
return err
}
}

for j, pos := range ap.UpdateColIdxList {
ctr.joinBat1.Vecs[pos] = vecs[j]
}
}
vecs := make([]*vector.Vector, len(ctr.exprExecs))
for j, exprExec := range ctr.exprExecs {
vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil)
if err != nil {
return err
}
}

for j, rp := range ap.Result {
if rp.Rel == 1 {
if err := ap.ctr.buf[batIdx].Vecs[j].UnionOne(ctr.joinBat1.Vecs[rp.Pos], 0, proc.Mp()); err != nil {
return err
for j, pos := range ap.UpdateColIdxList {
ctr.joinBat1.Vecs[pos] = vecs[j]
}
} else {
if err := ap.ctr.buf[batIdx].Vecs[j].UnionNull(proc.Mp()); err != nil {
return err
}

for j, rp := range ap.Result {
if rp.Rel == 1 {
if err := ap.ctr.buf[batIdx].Vecs[j].UnionOne(ctr.joinBat1.Vecs[rp.Pos], 0, proc.Mp()); err != nil {
return err
}
} else {
if err := ap.ctr.buf[batIdx].Vecs[j].UnionNull(proc.Mp()); err != nil {
return err
}
}
}
}
Expand All @@ -367,17 +402,100 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce
if err != nil {
return err
}
if ctr.joinBat1 == nil {
ctr.joinBat1, ctr.cfs1 = colexec.NewJoinBatch(bat, proc.Mp())

if ap.OnDuplicateAction == plan.Node_UPDATE {
if ctr.joinBat1 == nil {
ctr.joinBat1, ctr.cfs1 = colexec.NewJoinBatch(bat, proc.Mp())
}
if ctr.joinBat2 == nil && ctr.batchRowCount > 0 {
ctr.joinBat2, ctr.cfs2 = colexec.NewJoinBatch(ctr.batches[0], proc.Mp())
}
}
if ctr.joinBat2 == nil && ctr.batchRowCount > 0 {
ctr.joinBat2, ctr.cfs2 = colexec.NewJoinBatch(ctr.batches[0], proc.Mp())

isPessimistic := proc.GetTxnOperator().Txn().IsPessimistic()

if ctr.mp.IsPointQuery() {
switch ap.OnDuplicateAction {
case plan.Node_FAIL:
// do nothing for txn.mode = Optimistic
if !isPessimistic {
break
}

var rowStr string
if len(ap.DedupColTypes) == 1 {
if ap.DedupColName == catalog.IndexTableIndexColName {
if ctr.vecs[0].GetType().Oid == types.T_varchar {
t, _, schema, err := types.DecodeTuple(ctr.vecs[0].GetBytesAt(0))
if err == nil && len(schema) > 1 {
rowStr = t.ErrString(make([]int32, len(schema)))
}
}
}

if len(rowStr) == 0 {
rowStr = ctr.vecs[0].RowToString(0)
}
} else {
rowItems, err := types.StringifyTuple(ctr.vecs[0].GetBytesAt(0), ap.DedupColTypes)
if err != nil {
return err
}
rowStr = "(" + strings.Join(rowItems, ",") + ")"
}
return moerr.NewDuplicateEntry(proc.Ctx, rowStr, ap.DedupColName)

case plan.Node_IGNORE:
ctr.matched.Add(0)

case plan.Node_UPDATE:
err := colexec.SetJoinBatchValues(ctr.joinBat1, bat, 0, 1, ctr.cfs1)
if err != nil {
return err
}

err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[0], 0, 1, ctr.cfs2)
if err != nil {
return err
}

vecs := make([]*vector.Vector, len(ctr.exprExecs))
for j, exprExec := range ctr.exprExecs {
vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil)
if err != nil {
return err
}
}

for j, pos := range ap.UpdateColIdxList {
ctr.joinBat1.Vecs[pos] = vecs[j]
}

for j, rp := range ap.Result {
if rp.Rel == 1 {
if err := ctr.rbat.Vecs[j].UnionOne(ctr.joinBat1.Vecs[rp.Pos], 0, proc.Mp()); err != nil {
return err
}
} else {
if err := ctr.rbat.Vecs[j].UnionOne(bat.Vecs[rp.Pos], 0, proc.Mp()); err != nil {
return err
}
}
}

ctr.matched.Add(0)
ctr.rbat.AddRowCount(1)
}

result.Batch = ctr.rbat
ap.ctr.lastPos = 0

return nil
}

rowCntInc := 0
count := bat.RowCount()
itr := ctr.mp.NewIterator()
isPessimistic := proc.GetTxnOperator().Txn().IsPessimistic()
for i := 0; i < count; i += hashmap.UnitLimit {
n := count - i
if n > hashmap.UnitLimit {
Expand Down Expand Up @@ -428,8 +546,8 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce
return err
}

sels := ctr.mp.GetSels(vals[k])
for _, sel := range sels {
if ctr.mp.HashOnUnique() {
sel := vals[k] - 1
idx1, idx2 := sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize
err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2)
if err != nil {
Expand All @@ -447,6 +565,27 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce
for j, pos := range ap.UpdateColIdxList {
ctr.joinBat1.Vecs[pos] = vecs[j]
}
} else {
sels := ctr.mp.GetSels(vals[k])
for _, sel := range sels {
idx1, idx2 := sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize
err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2)
if err != nil {
return err
}

vecs := make([]*vector.Vector, len(ctr.exprExecs))
for j, exprExec := range ctr.exprExecs {
vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil)
if err != nil {
return err
}
}

for j, pos := range ap.UpdateColIdxList {
ctr.joinBat1.Vecs[pos] = vecs[j]
}
}
}

for j, rp := range ap.Result {
Expand All @@ -461,7 +600,7 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce
}
}

ctr.matched.Add(vals[k])
ctr.matched.Add(vals[k] - 1)
rowCntInc++
}
}
Expand Down Expand Up @@ -495,9 +634,9 @@ func (dedupJoin *DedupJoin) resetRBat() {

for i, rp := range dedupJoin.Result {
if rp.Rel == 0 {
ctr.rbat.Vecs[i] = vector.NewOffHeapVecWithType(dedupJoin.LeftTypes[rp.Pos])
ctr.rbat.Vecs[i] = vector.NewVec(dedupJoin.LeftTypes[rp.Pos])
} else {
ctr.rbat.Vecs[i] = vector.NewOffHeapVecWithType(dedupJoin.RightTypes[rp.Pos])
ctr.rbat.Vecs[i] = vector.NewVec(dedupJoin.RightTypes[rp.Pos])
}
}
}
Expand Down
Loading
Loading