From 2cb4a7141e2a05eb871dc183554463f7d8493493 Mon Sep 17 00:00:00 2001 From: iamlinjunhong <49111204+iamlinjunhong@users.noreply.github.com> Date: Fri, 17 Jan 2025 22:14:35 +0800 Subject: [PATCH] fix operator of partition (#21274) fix operator of partition Approved by: @ouyuanning, @aunjgr, @sukki37, @badboynt1 --- pkg/sql/colexec/mock_util.go | 14 +++++++++ pkg/sql/colexec/partition/partition.go | 35 ++++++++++++++++++++- pkg/sql/colexec/partition/partition_test.go | 6 ++-- 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/pkg/sql/colexec/mock_util.go b/pkg/sql/colexec/mock_util.go index 1878bbe1285fc..14a3c94aa03a1 100644 --- a/pkg/sql/colexec/mock_util.go +++ b/pkg/sql/colexec/mock_util.go @@ -123,6 +123,20 @@ func MakeMockBatchs() *batch.Batch { return bat } +func MakeMockPartitionBatchs(val int32) *batch.Batch { + bat := batch.New([]string{"a"}) + vecs := makeMockPartitionVecs(val) + bat.Vecs = vecs + bat.SetRowCount(vecs[0].Length()) + return bat +} + +func makeMockPartitionVecs(val int32) []*vector.Vector { + vecs := make([]*vector.Vector, 1) + vecs[0] = testutil.MakeInt32Vector([]int32{val, val, val}, nil) + return vecs +} + func makeMockTimeWinVecs() []*vector.Vector { vecs := make([]*vector.Vector, 2) vecs[0] = testutil.MakeDatetimeVector([]string{ diff --git a/pkg/sql/colexec/partition/partition.go b/pkg/sql/colexec/partition/partition.go index 8af6d71e071a2..c5988547ee75a 100644 --- a/pkg/sql/colexec/partition/partition.go +++ b/pkg/sql/colexec/partition/partition.go @@ -197,8 +197,9 @@ func (ctr *container) pickAndSend(proc *process.Process, result *vm.CallResult) hasSame := false var row int64 var cols []*vector.Vector + fromRemoveBatch := false for { - if wholeLength == 0 { + if wholeLength == 0 || fromRemoveBatch { choice = ctr.pickFirstRow() } else { if choice, hasSame = ctr.pickSameRow(row, cols); !hasSame { @@ -219,14 +220,22 @@ func (ctr *container) pickAndSend(proc *process.Process, result *vm.CallResult) wholeLength++ ctr.indexList[choice]++ + removeHasSame := true if ctr.indexList[choice] == int64(ctr.batchList[choice].RowCount()) { + removeHasSame = ctr.hasSameRow(row, cols, choice) ctr.removeBatch(proc, choice) + fromRemoveBatch = true + } else { + fromRemoveBatch = false } if len(ctr.indexList) == 0 { sendOver = true break } + if !removeHasSame { + break + } } ctr.buf.SetRowCount(wholeLength) result.Batch = ctr.buf @@ -278,6 +287,30 @@ func (ctr *container) pickSameRow(row int64, cols []*vector.Vector) (batIndex in return j, hasSame } +func (ctr *container) hasSameRow(row int64, cols []*vector.Vector, choice int) (hasSame bool) { + l := len(ctr.indexList) + + for j := 0; j < l; j++ { + if j == choice { + continue + } + hasSame = true + for k := 0; k < len(ctr.compares); k++ { + ctr.compares[k].Set(0, cols[k]) + ctr.compares[k].Set(1, ctr.orderCols[j][k]) + result := ctr.compares[k].Compare(0, 1, row, ctr.indexList[j]) + if result != 0 { + hasSame = false + break + } + } + if hasSame { + break + } + } + return hasSame +} + func (ctr *container) removeBatch(proc *process.Process, index int) { if ctr.batchList[index] != nil { ctr.batchList[index].Clean(proc.Mp()) diff --git a/pkg/sql/colexec/partition/partition_test.go b/pkg/sql/colexec/partition/partition_test.go index 2a44d927eebf4..43e7eb8c390f7 100644 --- a/pkg/sql/colexec/partition/partition_test.go +++ b/pkg/sql/colexec/partition/partition_test.go @@ -99,8 +99,10 @@ func newExpression(pos int32, typeID types.T) *plan.Expr { } func resetChildren(arg *Partition) { - bat := colexec.MakeMockBatchs() - op := colexec.NewMockOperator().WithBatchs([]*batch.Batch{bat}) + bat1 := colexec.MakeMockPartitionBatchs(1) + bat2 := colexec.MakeMockPartitionBatchs(2) + bat3 := colexec.MakeMockPartitionBatchs(3) + op := colexec.NewMockOperator().WithBatchs([]*batch.Batch{bat1, bat2, bat3}) arg.Children = nil arg.AppendChild(op) }