From d6c79a9c2fd048c66c38e6199ad87d362d048bf6 Mon Sep 17 00:00:00 2001 From: bRong Njam Date: Wed, 27 Nov 2024 17:51:46 +0800 Subject: [PATCH] [opt] optimize point queries in join operators, don't build hashtable if build side has only 1 row --- pkg/sql/colexec/dedupjoin/join.go | 116 ++++++++++++++----- pkg/sql/colexec/hashmap_util/hashmap_util.go | 17 ++- pkg/sql/plan/bind_insert.go | 2 +- 3 files changed, 93 insertions(+), 42 deletions(-) diff --git a/pkg/sql/colexec/dedupjoin/join.go b/pkg/sql/colexec/dedupjoin/join.go index 8baa99369be9c..49de0f62e6945 100644 --- a/pkg/sql/colexec/dedupjoin/join.go +++ b/pkg/sql/colexec/dedupjoin/join.go @@ -164,7 +164,7 @@ func (dedupJoin *DedupJoin) build(analyzer process.Analyzer, proc *process.Proce if dedupJoin.OnDuplicateAction != plan.Node_UPDATE { ctr.matched.InitWithSize(ctr.batchRowCount) } else { - ctr.matched.InitWithSize(int64(ctr.mp.GetGroupCount()) + 1) + ctr.matched.InitWithSize(int64(ctr.mp.GetGroupCount())) } } return @@ -194,10 +194,27 @@ func (ctr *container) finalize(ap *DedupJoin, proc *process.Process) error { } } - if ap.OnDuplicateAction != plan.Node_UPDATE { + if ap.OnDuplicateAction != plan.Node_UPDATE || ctr.mp.HashOnUnique() { if ctr.matched.Count() == 0 { - ap.ctr.buf = ctr.batches - ctr.batches = nil + ap.ctr.buf = make([]*batch.Batch, len(ctr.batches)) + for i := range ap.ctr.buf { + ap.ctr.buf[i] = batch.NewWithSize(len(ap.Result)) + batSize := ctr.batches[i].RowCount() + for j, rp := range ap.Result { + if rp.Rel == 1 { + ap.ctr.buf[i].Vecs[j] = vector.NewVec(ap.RightTypes[rp.Pos]) + if err := ap.ctr.buf[i].Vecs[j].UnionBatch(ctr.batches[i].Vecs[rp.Pos], 0, batSize, nil, proc.Mp()); err != nil { + return err + } + } else { + ap.ctr.buf[i].Vecs[j] = vector.NewVec(ap.LeftTypes[rp.Pos]) + if err := vector.AppendMultiFixed(ap.ctr.buf[i].Vecs[j], 0, true, batSize, proc.Mp()); err != nil { + return err + } + } + } + ap.ctr.buf[i].SetRowCount(batSize) + } return nil } @@ -293,7 +310,7 @@ func (ctr *container) finalize(ap *DedupJoin, proc *process.Process) error { ctr.joinBat1, ctr.cfs1 = colexec.NewJoinBatch(ctr.batches[0], proc.Mp()) bitmapLen := uint64(ctr.matched.Len()) - for i := uint64(1); i < bitmapLen; i++ { + for i := uint64(0); i < bitmapLen; i++ { if ctr.matched.Contains(i) { continue } @@ -309,41 +326,55 @@ func (ctr *container) finalize(ap *DedupJoin, proc *process.Process) error { } } - sels = ctr.mp.GetSels(i) + sels = ctr.mp.GetSels(i + 1) idx1, idx2 := sels[0]/colexec.DefaultBatchSize, sels[0]%colexec.DefaultBatchSize - err := colexec.SetJoinBatchValues(ctr.joinBat1, ctr.batches[idx1], int64(idx2), 1, ctr.cfs1) - if err != nil { - return err - } - - for _, sel := range sels[1:] { - idx1, idx2 = sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize - err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2) + if len(sels) == 1 { + for j, rp := range ap.Result { + if rp.Rel == 1 { + if err := ap.ctr.buf[batIdx].Vecs[j].UnionOne(ctr.batches[idx1].Vecs[rp.Pos], int64(idx2), proc.Mp()); err != nil { + return err + } + } else { + if err := ap.ctr.buf[batIdx].Vecs[j].UnionNull(proc.Mp()); err != nil { + return err + } + } + } + } else { + err := colexec.SetJoinBatchValues(ctr.joinBat1, ctr.batches[idx1], int64(idx2), 1, ctr.cfs1) if err != nil { return err } - vecs := make([]*vector.Vector, len(ctr.exprExecs)) - for j, exprExec := range ctr.exprExecs { - vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil) + for _, sel := range sels[1:] { + idx1, idx2 = sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize + err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2) if err != nil { return err } - } - for j, pos := range ap.UpdateColIdxList { - ctr.joinBat1.Vecs[pos] = vecs[j] - } - } + vecs := make([]*vector.Vector, len(ctr.exprExecs)) + for j, exprExec := range ctr.exprExecs { + vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil) + if err != nil { + return err + } + } - for j, rp := range ap.Result { - if rp.Rel == 1 { - if err := ap.ctr.buf[batIdx].Vecs[j].UnionOne(ctr.joinBat1.Vecs[rp.Pos], 0, proc.Mp()); err != nil { - return err + for j, pos := range ap.UpdateColIdxList { + ctr.joinBat1.Vecs[pos] = vecs[j] } - } else { - if err := ap.ctr.buf[batIdx].Vecs[j].UnionNull(proc.Mp()); err != nil { - return err + } + + for j, rp := range ap.Result { + if rp.Rel == 1 { + if err := ap.ctr.buf[batIdx].Vecs[j].UnionOne(ctr.joinBat1.Vecs[rp.Pos], 0, proc.Mp()); err != nil { + return err + } + } else { + if err := ap.ctr.buf[batIdx].Vecs[j].UnionNull(proc.Mp()); err != nil { + return err + } } } } @@ -428,8 +459,8 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce return err } - sels := ctr.mp.GetSels(vals[k]) - for _, sel := range sels { + if ctr.mp.HashOnUnique() { + sel := vals[k] - 1 idx1, idx2 := sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2) if err != nil { @@ -447,6 +478,27 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce for j, pos := range ap.UpdateColIdxList { ctr.joinBat1.Vecs[pos] = vecs[j] } + } else { + sels := ctr.mp.GetSels(vals[k]) + for _, sel := range sels { + idx1, idx2 := sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize + err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2) + if err != nil { + return err + } + + vecs := make([]*vector.Vector, len(ctr.exprExecs)) + for j, exprExec := range ctr.exprExecs { + vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil) + if err != nil { + return err + } + } + + for j, pos := range ap.UpdateColIdxList { + ctr.joinBat1.Vecs[pos] = vecs[j] + } + } } for j, rp := range ap.Result { @@ -461,7 +513,7 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce } } - ctr.matched.Add(vals[k]) + ctr.matched.Add(vals[k] - 1) rowCntInc++ } } diff --git a/pkg/sql/colexec/hashmap_util/hashmap_util.go b/pkg/sql/colexec/hashmap_util/hashmap_util.go index 7b1562382d666..75237909605b2 100644 --- a/pkg/sql/colexec/hashmap_util/hashmap_util.go +++ b/pkg/sql/colexec/hashmap_util/hashmap_util.go @@ -370,16 +370,15 @@ func (hb *HashmapBuilder) BuildHashmap(hashOnPK bool, needAllocateSels bool, nee // if groupcount == inputrowcount, it means building hashmap on unique rows // we can free sels now - if !hb.IsDedup { - if hb.keyWidth <= 8 { - if hb.InputBatchRowCount == int(hb.IntHashMap.GroupCount()) { - hb.MultiSels.Free() - } - } else { - if hb.InputBatchRowCount == int(hb.StrHashMap.GroupCount()) { - hb.MultiSels.Free() - } + if hb.keyWidth <= 8 { + if hb.InputBatchRowCount == int(hb.IntHashMap.GroupCount()) { + hb.MultiSels.Free() + } + } else { + if hb.InputBatchRowCount == int(hb.StrHashMap.GroupCount()) { + hb.MultiSels.Free() } } + return nil } diff --git a/pkg/sql/plan/bind_insert.go b/pkg/sql/plan/bind_insert.go index 19cc5ed3d28fc..39bdb93b65c6d 100644 --- a/pkg/sql/plan/bind_insert.go +++ b/pkg/sql/plan/bind_insert.go @@ -237,7 +237,7 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert( // handle primary/unique key confliction if builder.canSkipDedup(dmlCtx.tableDefs[0]) { - // load do not handle primary/unique key confliction + // restore/load do not handle primary/unique key confliction for i, idxDef := range tableDef.Indexes { if !idxDef.TableExist || skipUniqueIdx[i] { continue