diff --git a/pkg/container/batch/batch.go b/pkg/container/batch/batch.go index b4404866b9a9c..005de5055ee49 100644 --- a/pkg/container/batch/batch.go +++ b/pkg/container/batch/batch.go @@ -18,14 +18,13 @@ import ( "bytes" "context" "fmt" - "github.com/matrixorigin/matrixone/pkg/common/bitmap" - - "github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec" + "github.com/matrixorigin/matrixone/pkg/common/bitmap" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec" ) func New(attrs []string) *Batch { diff --git a/pkg/sql/colexec/dedupjoin/join.go b/pkg/sql/colexec/dedupjoin/join.go index 8baa99369be9c..808e11ba215d1 100644 --- a/pkg/sql/colexec/dedupjoin/join.go +++ b/pkg/sql/colexec/dedupjoin/join.go @@ -164,7 +164,7 @@ func (dedupJoin *DedupJoin) build(analyzer process.Analyzer, proc *process.Proce if dedupJoin.OnDuplicateAction != plan.Node_UPDATE { ctr.matched.InitWithSize(ctr.batchRowCount) } else { - ctr.matched.InitWithSize(int64(ctr.mp.GetGroupCount()) + 1) + ctr.matched.InitWithSize(int64(ctr.mp.GetGroupCount())) } } return @@ -194,10 +194,27 @@ func (ctr *container) finalize(ap *DedupJoin, proc *process.Process) error { } } - if ap.OnDuplicateAction != plan.Node_UPDATE { + if ap.OnDuplicateAction != plan.Node_UPDATE || ctr.mp.HashOnUnique() { if ctr.matched.Count() == 0 { - ap.ctr.buf = ctr.batches - ctr.batches = nil + //ap.ctr.buf = ctr.batches + ap.ctr.buf = make([]*batch.Batch, len(ctr.batches)) + for i := range ap.ctr.buf { + ap.ctr.buf[i] = batch.NewWithSize(len(ap.Result)) + batSize := ctr.batches[i].Vecs[0].Length() + for j, rp := range ap.Result { + if rp.Rel == 1 { + ap.ctr.buf[i].SetVector(int32(j), ctr.batches[i].Vecs[rp.Pos]) + ctr.batches[i].Vecs[rp.Pos] = nil + } else { + ap.ctr.buf[i].Vecs[j] = vector.NewVec(ap.LeftTypes[rp.Pos]) + if err := vector.AppendMultiFixed(ap.ctr.buf[i].Vecs[j], 0, true, batSize, proc.Mp()); err != nil { + return err + } + } + } + + ap.ctr.buf[i].SetRowCount(batSize) + } return nil } @@ -255,37 +272,37 @@ func (ctr *container) finalize(ap *DedupJoin, proc *process.Process) error { batCnt := (count-1)/colexec.DefaultBatchSize + 1 ap.ctr.buf = make([]*batch.Batch, batCnt) - fillCnt := 0 batIdx, rowIdx := 0, 0 - for fillCnt < len(sels) { - batSize := colexec.DefaultBatchSize - if fillCnt+batSize > len(sels) { - batSize = len(sels) - fillCnt - } - - ap.ctr.buf[batIdx] = batch.NewWithSize(len(ap.Result)) - for i, rp := range ap.Result { - if rp.Rel == 1 { - ap.ctr.buf[batIdx].Vecs[i] = vector.NewVec(ap.RightTypes[rp.Pos]) - for _, sel := range sels[fillCnt : fillCnt+batSize] { - idx1, idx2 := sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize - if err := ap.ctr.buf[batIdx].Vecs[i].UnionOne(ctr.batches[idx1].Vecs[rp.Pos], int64(idx2), proc.Mp()); err != nil { - return err - } - } - } else { - ap.ctr.buf[batIdx].Vecs[i] = vector.NewVec(ap.LeftTypes[rp.Pos]) - if err := vector.AppendMultiFixed(ap.ctr.buf[batIdx].Vecs[i], 0, true, batSize, proc.Mp()); err != nil { - return err - } - } - } - - ap.ctr.buf[batIdx].SetRowCount(batSize) - fillCnt += batSize - batIdx++ - rowIdx = batSize % colexec.DefaultBatchSize - } + //fillCnt := 0 + //for fillCnt < len(sels) { + // batSize := colexec.DefaultBatchSize + // if fillCnt+batSize > len(sels) { + // batSize = len(sels) - fillCnt + // } + // + // ap.ctr.buf[batIdx] = batch.NewWithSize(len(ap.Result)) + // for i, rp := range ap.Result { + // if rp.Rel == 1 { + // ap.ctr.buf[batIdx].Vecs[i] = vector.NewVec(ap.RightTypes[rp.Pos]) + // for _, sel := range sels[fillCnt : fillCnt+batSize] { + // idx1, idx2 := sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize + // if err := ap.ctr.buf[batIdx].Vecs[i].UnionOne(ctr.batches[idx1].Vecs[rp.Pos], int64(idx2), proc.Mp()); err != nil { + // return err + // } + // } + // } else { + // ap.ctr.buf[batIdx].Vecs[i] = vector.NewVec(ap.LeftTypes[rp.Pos]) + // if err := vector.AppendMultiFixed(ap.ctr.buf[batIdx].Vecs[i], 0, true, batSize, proc.Mp()); err != nil { + // return err + // } + // } + // } + // + // ap.ctr.buf[batIdx].SetRowCount(batSize) + // fillCnt += batSize + // batIdx++ + // rowIdx = batSize % colexec.DefaultBatchSize + //} if ctr.joinBat1 != nil { ctr.joinBat1.Clean(proc.GetMPool()) @@ -293,7 +310,7 @@ func (ctr *container) finalize(ap *DedupJoin, proc *process.Process) error { ctr.joinBat1, ctr.cfs1 = colexec.NewJoinBatch(ctr.batches[0], proc.Mp()) bitmapLen := uint64(ctr.matched.Len()) - for i := uint64(1); i < bitmapLen; i++ { + for i := uint64(0); i < bitmapLen; i++ { if ctr.matched.Contains(i) { continue } @@ -309,41 +326,59 @@ func (ctr *container) finalize(ap *DedupJoin, proc *process.Process) error { } } - sels = ctr.mp.GetSels(i) + sels = ctr.mp.GetSels(i + 1) idx1, idx2 := sels[0]/colexec.DefaultBatchSize, sels[0]%colexec.DefaultBatchSize - err := colexec.SetJoinBatchValues(ctr.joinBat1, ctr.batches[idx1], int64(idx2), 1, ctr.cfs1) - if err != nil { - return err - } - - for _, sel := range sels[1:] { - idx1, idx2 = sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize - err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2) + if len(sels) == 1 { + for j, rp := range ap.Result { + if rp.Rel == 1 { + if err := ap.ctr.buf[batIdx].Vecs[j].UnionOne(ctr.batches[idx1].Vecs[rp.Pos], int64(idx2), proc.Mp()); err != nil { + return err + } + } else { + if err := ap.ctr.buf[batIdx].Vecs[j].UnionNull(proc.Mp()); err != nil { + return err + } + } + } + } else { + err := colexec.SetJoinBatchValues(ctr.joinBat1, ctr.batches[idx1], int64(idx2), 1, ctr.cfs1) if err != nil { return err } - vecs := make([]*vector.Vector, len(ctr.exprExecs)) - for j, exprExec := range ctr.exprExecs { - vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil) + if ctr.joinBat2 == nil { + ctr.joinBat2, ctr.cfs2 = colexec.NewJoinBatch(ctr.batches[0], proc.Mp()) + } + + for _, sel := range sels[1:] { + idx1, idx2 = sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize + err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2) if err != nil { return err } - } - for j, pos := range ap.UpdateColIdxList { - ctr.joinBat1.Vecs[pos] = vecs[j] - } - } + vecs := make([]*vector.Vector, len(ctr.exprExecs)) + for j, exprExec := range ctr.exprExecs { + vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil) + if err != nil { + return err + } + } - for j, rp := range ap.Result { - if rp.Rel == 1 { - if err := ap.ctr.buf[batIdx].Vecs[j].UnionOne(ctr.joinBat1.Vecs[rp.Pos], 0, proc.Mp()); err != nil { - return err + for j, pos := range ap.UpdateColIdxList { + ctr.joinBat1.Vecs[pos] = vecs[j] } - } else { - if err := ap.ctr.buf[batIdx].Vecs[j].UnionNull(proc.Mp()); err != nil { - return err + } + + for j, rp := range ap.Result { + if rp.Rel == 1 { + if err := ap.ctr.buf[batIdx].Vecs[j].UnionOne(ctr.joinBat1.Vecs[rp.Pos], 0, proc.Mp()); err != nil { + return err + } + } else { + if err := ap.ctr.buf[batIdx].Vecs[j].UnionNull(proc.Mp()); err != nil { + return err + } } } } @@ -367,17 +402,100 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce if err != nil { return err } - if ctr.joinBat1 == nil { - ctr.joinBat1, ctr.cfs1 = colexec.NewJoinBatch(bat, proc.Mp()) + + if ap.OnDuplicateAction == plan.Node_UPDATE { + if ctr.joinBat1 == nil { + ctr.joinBat1, ctr.cfs1 = colexec.NewJoinBatch(bat, proc.Mp()) + } + if ctr.joinBat2 == nil && ctr.batchRowCount > 0 { + ctr.joinBat2, ctr.cfs2 = colexec.NewJoinBatch(ctr.batches[0], proc.Mp()) + } } - if ctr.joinBat2 == nil && ctr.batchRowCount > 0 { - ctr.joinBat2, ctr.cfs2 = colexec.NewJoinBatch(ctr.batches[0], proc.Mp()) + + isPessimistic := proc.GetTxnOperator().Txn().IsPessimistic() + + if ctr.mp.IsPointQuery() { + switch ap.OnDuplicateAction { + case plan.Node_FAIL: + // do nothing for txn.mode = Optimistic + if !isPessimistic { + break + } + + var rowStr string + if len(ap.DedupColTypes) == 1 { + if ap.DedupColName == catalog.IndexTableIndexColName { + if ctr.vecs[0].GetType().Oid == types.T_varchar { + t, _, schema, err := types.DecodeTuple(ctr.vecs[0].GetBytesAt(0)) + if err == nil && len(schema) > 1 { + rowStr = t.ErrString(make([]int32, len(schema))) + } + } + } + + if len(rowStr) == 0 { + rowStr = ctr.vecs[0].RowToString(0) + } + } else { + rowItems, err := types.StringifyTuple(ctr.vecs[0].GetBytesAt(0), ap.DedupColTypes) + if err != nil { + return err + } + rowStr = "(" + strings.Join(rowItems, ",") + ")" + } + return moerr.NewDuplicateEntry(proc.Ctx, rowStr, ap.DedupColName) + + case plan.Node_IGNORE: + ctr.matched.Add(0) + + case plan.Node_UPDATE: + err := colexec.SetJoinBatchValues(ctr.joinBat1, bat, 0, 1, ctr.cfs1) + if err != nil { + return err + } + + err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[0], 0, 1, ctr.cfs2) + if err != nil { + return err + } + + vecs := make([]*vector.Vector, len(ctr.exprExecs)) + for j, exprExec := range ctr.exprExecs { + vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil) + if err != nil { + return err + } + } + + for j, pos := range ap.UpdateColIdxList { + ctr.joinBat1.Vecs[pos] = vecs[j] + } + + for j, rp := range ap.Result { + if rp.Rel == 1 { + if err := ctr.rbat.Vecs[j].UnionOne(ctr.joinBat1.Vecs[rp.Pos], 0, proc.Mp()); err != nil { + return err + } + } else { + if err := ctr.rbat.Vecs[j].UnionOne(bat.Vecs[rp.Pos], 0, proc.Mp()); err != nil { + return err + } + } + } + + ctr.matched.Add(0) + ctr.rbat.AddRowCount(1) + } + + result.Batch = ctr.rbat + ap.ctr.lastPos = 0 + + return nil } rowCntInc := 0 count := bat.RowCount() itr := ctr.mp.NewIterator() - isPessimistic := proc.GetTxnOperator().Txn().IsPessimistic() for i := 0; i < count; i += hashmap.UnitLimit { n := count - i if n > hashmap.UnitLimit { @@ -428,8 +546,8 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce return err } - sels := ctr.mp.GetSels(vals[k]) - for _, sel := range sels { + if ctr.mp.HashOnUnique() { + sel := vals[k] - 1 idx1, idx2 := sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2) if err != nil { @@ -447,6 +565,27 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce for j, pos := range ap.UpdateColIdxList { ctr.joinBat1.Vecs[pos] = vecs[j] } + } else { + sels := ctr.mp.GetSels(vals[k]) + for _, sel := range sels { + idx1, idx2 := sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize + err = colexec.SetJoinBatchValues(ctr.joinBat2, ctr.batches[idx1], int64(idx2), 1, ctr.cfs2) + if err != nil { + return err + } + + vecs := make([]*vector.Vector, len(ctr.exprExecs)) + for j, exprExec := range ctr.exprExecs { + vecs[j], err = exprExec.Eval(proc, []*batch.Batch{ctr.joinBat1, ctr.joinBat2}, nil) + if err != nil { + return err + } + } + + for j, pos := range ap.UpdateColIdxList { + ctr.joinBat1.Vecs[pos] = vecs[j] + } + } } for j, rp := range ap.Result { @@ -461,7 +600,7 @@ func (ctr *container) probe(bat *batch.Batch, ap *DedupJoin, proc *process.Proce } } - ctr.matched.Add(vals[k]) + ctr.matched.Add(vals[k] - 1) rowCntInc++ } } @@ -495,9 +634,9 @@ func (dedupJoin *DedupJoin) resetRBat() { for i, rp := range dedupJoin.Result { if rp.Rel == 0 { - ctr.rbat.Vecs[i] = vector.NewOffHeapVecWithType(dedupJoin.LeftTypes[rp.Pos]) + ctr.rbat.Vecs[i] = vector.NewVec(dedupJoin.LeftTypes[rp.Pos]) } else { - ctr.rbat.Vecs[i] = vector.NewOffHeapVecWithType(dedupJoin.RightTypes[rp.Pos]) + ctr.rbat.Vecs[i] = vector.NewVec(dedupJoin.RightTypes[rp.Pos]) } } } diff --git a/pkg/sql/colexec/dedupjoin/join_test.go b/pkg/sql/colexec/dedupjoin/join_test.go index a43967b250725..2c57de20287fe 100644 --- a/pkg/sql/colexec/dedupjoin/join_test.go +++ b/pkg/sql/colexec/dedupjoin/join_test.go @@ -153,52 +153,6 @@ func TestDedupJoin(t *testing.T) { } } -/* - func BenchmarkJoin(b *testing.B) { - for i := 0; i < b.N; i++ { - tcs = []joinTestCase{ - newTestCase([]bool{false}, []types.Type{types.T_int8.ToType()}, []int32{0}, - [][]*plan.Expr{ - { - newExpr(0, types.T_int8.ToType()), - }, - { - newExpr(0, types.T_int8.ToType()), - }, - }), - newTestCase([]bool{true}, []types.Type{types.T_int8.ToType()}, []int32{0}, - [][]*plan.Expr{ - { - newExpr(0, types.T_int8.ToType()), - }, - { - newExpr(0, types.T_int8.ToType()), - }, - }), - } - t := new(testing.T) - for _, tc := range tcs { - bats := hashBuild(t, tc) - err := tc.arg.Prepare(tc.proc) - require.NoError(t, err) - tc.proc.Reg.MergeReceivers[0].Ch <- testutil.NewRegMsg(newBatch(tc.types, tc.proc, Rows)) - tc.proc.Reg.MergeReceivers[0].Ch <- testutil.NewRegMsg(batch.EmptyBatch) - tc.proc.Reg.MergeReceivers[0].Ch <- testutil.NewRegMsg(newBatch(tc.types, tc.proc, Rows)) - tc.proc.Reg.MergeReceivers[0].Ch <- testutil.NewRegMsg(newBatch(tc.types, tc.proc, Rows)) - tc.proc.Reg.MergeReceivers[0].Ch <- testutil.NewRegMsg(newBatch(tc.types, tc.proc, Rows)) - tc.proc.Reg.MergeReceivers[0].Ch <- nil - tc.proc.Reg.MergeReceivers[1].Ch <- testutil.NewRegMsg(bats[0]) - tc.proc.Reg.MergeReceivers[1].Ch <- testutil.NewRegMsg(bats[1]) - for { - ok, err := tc.arg.Call(tc.proc) - if ok.Status == vm.ExecStop || err != nil { - break - } - } - } - } - } -*/ func newExpr(pos int32, typ types.Type) *plan.Expr { return &plan.Expr{ Typ: plan.Type{ diff --git a/pkg/sql/colexec/hashmap_util/hashmap_util.go b/pkg/sql/colexec/hashmap_util/hashmap_util.go index 7b1562382d666..252dda42d1ac0 100644 --- a/pkg/sql/colexec/hashmap_util/hashmap_util.go +++ b/pkg/sql/colexec/hashmap_util/hashmap_util.go @@ -189,6 +189,24 @@ func (hb *HashmapBuilder) BuildHashmap(hashOnPK bool, needAllocateSels bool, nee return err } + if hb.IsDedup && hb.InputBatchRowCount == 1 && needUniqueVec { + if len(hb.UniqueJoinKeys) == 0 { + hb.UniqueJoinKeys = make([]*vector.Vector, len(hb.executor)) + for j, vec := range hb.vecs[0] { + hb.UniqueJoinKeys[j] = vector.NewVec(*vec.GetType()) + } + } + + for i, vec := range hb.vecs[0] { + err = hb.UniqueJoinKeys[i].UnionOne(vec, 0, proc.Mp()) + if err != nil { + return err + } + } + + return nil + } + var itr hashmap.Iterator if hb.keyWidth <= 8 { if hb.IntHashMap, err = hashmap.NewIntHashMap(false); err != nil { @@ -228,7 +246,7 @@ func (hb *HashmapBuilder) BuildHashmap(hashOnPK bool, needAllocateSels bool, nee var ( cardinality uint64 - sels []int32 + newSels []int64 ) vOld := uint64(0) @@ -336,25 +354,22 @@ func (hb *HashmapBuilder) BuildHashmap(hashOnPK bool, needAllocateSels bool, nee } } } else { - if sels == nil { - sels = make([]int32, hashmap.UnitLimit) + if newSels == nil { + newSels = make([]int64, hashmap.UnitLimit) } - sels = sels[:0] + newSels = newSels[:0] for j, v := range vals[:n] { if v > cardinality { - sels = append(sels, int32(i+j)) + newSels = append(newSels, int64(vecIdx2+j)) cardinality = v } } for j, vec := range hb.vecs[vecIdx1] { - for _, sel := range sels { - _, idx2 := sel/colexec.DefaultBatchSize, sel%colexec.DefaultBatchSize - err = hb.UniqueJoinKeys[j].UnionOne(vec, int64(idx2), proc.Mp()) - if err != nil { - return err - } + err = hb.UniqueJoinKeys[j].Union(vec, newSels, proc.Mp()) + if err != nil { + return err } } } @@ -370,16 +385,15 @@ func (hb *HashmapBuilder) BuildHashmap(hashOnPK bool, needAllocateSels bool, nee // if groupcount == inputrowcount, it means building hashmap on unique rows // we can free sels now - if !hb.IsDedup { - if hb.keyWidth <= 8 { - if hb.InputBatchRowCount == int(hb.IntHashMap.GroupCount()) { - hb.MultiSels.Free() - } - } else { - if hb.InputBatchRowCount == int(hb.StrHashMap.GroupCount()) { - hb.MultiSels.Free() - } + if hb.keyWidth <= 8 { + if hb.InputBatchRowCount == int(hb.IntHashMap.GroupCount()) { + hb.MultiSels.Free() + } + } else { + if hb.InputBatchRowCount == int(hb.StrHashMap.GroupCount()) { + hb.MultiSels.Free() } } + return nil } diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 02c484666c4e3..b498067ababcd 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -21,24 +21,15 @@ import ( "sync" "time" - "github.com/matrixorigin/matrixone/pkg/logutil" - - "github.com/matrixorigin/matrixone/pkg/sql/colexec/group" - - "github.com/matrixorigin/matrixone/pkg/sql/colexec/mergegroup" - - "github.com/matrixorigin/matrixone/pkg/fileservice" - "github.com/matrixorigin/matrixone/pkg/objectio" - - "github.com/panjf2000/ants/v2" - "go.uber.org/zap" - "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/cnservice/cnclient" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/reuse" "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/defines" + "github.com/matrixorigin/matrixone/pkg/fileservice" + "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/objectio" pbpipeline "github.com/matrixorigin/matrixone/pkg/pb/pipeline" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" @@ -46,6 +37,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/sql/colexec" "github.com/matrixorigin/matrixone/pkg/sql/colexec/dispatch" "github.com/matrixorigin/matrixone/pkg/sql/colexec/filter" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/group" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/mergegroup" "github.com/matrixorigin/matrixone/pkg/sql/colexec/output" "github.com/matrixorigin/matrixone/pkg/sql/colexec/table_scan" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" @@ -57,6 +50,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/message" "github.com/matrixorigin/matrixone/pkg/vm/pipeline" "github.com/matrixorigin/matrixone/pkg/vm/process" + "github.com/panjf2000/ants/v2" + "go.uber.org/zap" ) func newScope(magic magicType) *Scope { diff --git a/pkg/sql/plan/bind_insert.go b/pkg/sql/plan/bind_insert.go index 0f191410a280c..d56f324578f53 100644 --- a/pkg/sql/plan/bind_insert.go +++ b/pkg/sql/plan/bind_insert.go @@ -240,7 +240,7 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert( // handle primary/unique key confliction if builder.canSkipDedup(dmlCtx.tableDefs[0]) { - // load do not handle primary/unique key confliction + // restore/load do not handle primary/unique key confliction for i, idxDef := range tableDef.Indexes { if !idxDef.TableExist || skipUniqueIdx[i] { continue diff --git a/pkg/sql/plan/build_constraint_util.go b/pkg/sql/plan/build_constraint_util.go index 3906bbb130907..0ff39d1ef36e7 100644 --- a/pkg/sql/plan/build_constraint_util.go +++ b/pkg/sql/plan/build_constraint_util.go @@ -1464,16 +1464,6 @@ func appendPrimaryConstraintPlan( } if needCheck && useFuzzyFilter { - rfTag := builder.genNewMsgTag() - probeExpr := &plan.Expr{ - Typ: pkTyp, - Expr: &plan.Expr_Col{ - Col: &plan.ColRef{ - Name: tableDef.Pkey.PkeyColName, - ColPos: int32(pkPos), - }, - }, - } // sink_scan sinkScanNode := &Node{ NodeType: plan.Node_SINK_SCAN, @@ -1516,6 +1506,17 @@ func appendPrimaryConstraintPlan( scanTableDef.Partition.PartitionExpression = partitionExpr } + rfTag := builder.genNewMsgTag() + probeExpr := &plan.Expr{ + Typ: pkTyp, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + Name: tableDef.Pkey.PkeyColName, + ColPos: int32(len(scanTableDef.Cols) - 1), + }, + }, + } + scanNode := &plan.Node{ NodeType: plan.Node_TABLE_SCAN, Stats: &plan.Stats{}, diff --git a/pkg/vm/message/joinMapMsg.go b/pkg/vm/message/joinMapMsg.go index 5c1ffd5545065..9a8cfc2f65eb6 100644 --- a/pkg/vm/message/joinMapMsg.go +++ b/pkg/vm/message/joinMapMsg.go @@ -78,15 +78,19 @@ type JoinMap struct { func NewJoinMap(sels JoinSels, ihm *hashmap.IntHashMap, shm *hashmap.StrHashMap, batches []*batch.Batch, m *mpool.MPool) *JoinMap { return &JoinMap{ + valid: true, shm: shm, ihm: ihm, + mpool: m, multiSels: sels, batches: batches, - mpool: m, - valid: true, } } +func (jm *JoinMap) IsPointQuery() bool { + return jm.runtimeFilter_In && jm.shm == nil && jm.ihm == nil +} + func (jm *JoinMap) GetBatches() []*batch.Batch { if jm == nil { return nil @@ -115,8 +119,13 @@ func (jm *JoinMap) GetRowCount() int64 { func (jm *JoinMap) GetGroupCount() uint64 { if jm.ihm != nil { return jm.ihm.GroupCount() + } else if jm.shm != nil { + return jm.shm.GroupCount() + } else if jm.runtimeFilter_In { + return 1 } - return jm.shm.GroupCount() + + return 0 } func (jm *JoinMap) SetPushedRuntimeFilterIn(b bool) { diff --git a/test/distributed/cases/dml/insert/insert_duplicate.result b/test/distributed/cases/dml/insert/insert_duplicate.result index 8b7c5ea72012e..85afe8042a90f 100644 --- a/test/distributed/cases/dml/insert/insert_duplicate.result +++ b/test/distributed/cases/dml/insert/insert_duplicate.result @@ -391,3 +391,19 @@ id biz_type namespace sn SELECT * FROM `serial_numbers` WHERE `biz_type` = 4 and `namespace` = '2024091117'; id biz_type namespace sn 100001 4 2024091117 1 +drop table if exists indup_11; +create table indup_11(a int primary key, b int); +insert into indup_11 values(1, 1), (1, 1), (2, 2) on duplicate key update b = b + 10; +select * from indup_11 where b; +a b +1 11 +2 2 +insert ignore into indup_11(a) select result from generate_series(1, 100000) g; +insert into indup_11 values(1, 1); +Duplicate entry '1' for key 'a' +insert ignore into indup_11 values(1, 1); +insert into indup_11 values(1, 1) on duplicate key update b = a + 100; +select * from indup_11 where b is not null order by a; +a b +1 101 +2 2 diff --git a/test/distributed/cases/dml/insert/insert_duplicate.sql b/test/distributed/cases/dml/insert/insert_duplicate.sql index 5aba9cd55abf8..a52b0177d2e1e 100644 --- a/test/distributed/cases/dml/insert/insert_duplicate.sql +++ b/test/distributed/cases/dml/insert/insert_duplicate.sql @@ -233,3 +233,13 @@ insert into `serial_numbers` (`biz_type`, `namespace`, `sn`) select result,resul INSERT INTO `serial_numbers` (`biz_type`, `namespace`, `sn`) VALUES (4, '2024091117', 1) ON DUPLICATE KEY UPDATE `sn` = `sn` + 1; SELECT * FROM `serial_numbers` WHERE `biz_type` = 4 ; SELECT * FROM `serial_numbers` WHERE `biz_type` = 4 and `namespace` = '2024091117'; + +drop table if exists indup_11; +create table indup_11(a int primary key, b int); +insert into indup_11 values(1, 1), (1, 1), (2, 2) on duplicate key update b = b + 10; +select * from indup_11 where b; +insert ignore into indup_11(a) select result from generate_series(1, 100000) g; +insert into indup_11 values(1, 1); +insert ignore into indup_11 values(1, 1); +insert into indup_11 values(1, 1) on duplicate key update b = a + 100; +select * from indup_11 where b is not null order by a;