Skip to content

Commit

Permalink
fix operator of partition (#21274)
Browse files Browse the repository at this point in the history
fix operator of partition

Approved by: @ouyuanning, @aunjgr, @sukki37, @badboynt1
  • Loading branch information
iamlinjunhong authored Jan 17, 2025
1 parent e5433df commit 2cb4a71
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 3 deletions.
14 changes: 14 additions & 0 deletions pkg/sql/colexec/mock_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ func MakeMockBatchs() *batch.Batch {
return bat
}

func MakeMockPartitionBatchs(val int32) *batch.Batch {
bat := batch.New([]string{"a"})
vecs := makeMockPartitionVecs(val)
bat.Vecs = vecs
bat.SetRowCount(vecs[0].Length())
return bat
}

func makeMockPartitionVecs(val int32) []*vector.Vector {
vecs := make([]*vector.Vector, 1)
vecs[0] = testutil.MakeInt32Vector([]int32{val, val, val}, nil)
return vecs
}

func makeMockTimeWinVecs() []*vector.Vector {
vecs := make([]*vector.Vector, 2)
vecs[0] = testutil.MakeDatetimeVector([]string{
Expand Down
35 changes: 34 additions & 1 deletion pkg/sql/colexec/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ func (ctr *container) pickAndSend(proc *process.Process, result *vm.CallResult)
hasSame := false
var row int64
var cols []*vector.Vector
fromRemoveBatch := false
for {
if wholeLength == 0 {
if wholeLength == 0 || fromRemoveBatch {
choice = ctr.pickFirstRow()
} else {
if choice, hasSame = ctr.pickSameRow(row, cols); !hasSame {
Expand All @@ -219,14 +220,22 @@ func (ctr *container) pickAndSend(proc *process.Process, result *vm.CallResult)
wholeLength++

ctr.indexList[choice]++
removeHasSame := true
if ctr.indexList[choice] == int64(ctr.batchList[choice].RowCount()) {
removeHasSame = ctr.hasSameRow(row, cols, choice)
ctr.removeBatch(proc, choice)
fromRemoveBatch = true
} else {
fromRemoveBatch = false
}

if len(ctr.indexList) == 0 {
sendOver = true
break
}
if !removeHasSame {
break
}
}
ctr.buf.SetRowCount(wholeLength)
result.Batch = ctr.buf
Expand Down Expand Up @@ -278,6 +287,30 @@ func (ctr *container) pickSameRow(row int64, cols []*vector.Vector) (batIndex in
return j, hasSame
}

func (ctr *container) hasSameRow(row int64, cols []*vector.Vector, choice int) (hasSame bool) {
l := len(ctr.indexList)

for j := 0; j < l; j++ {
if j == choice {
continue
}
hasSame = true
for k := 0; k < len(ctr.compares); k++ {
ctr.compares[k].Set(0, cols[k])
ctr.compares[k].Set(1, ctr.orderCols[j][k])
result := ctr.compares[k].Compare(0, 1, row, ctr.indexList[j])
if result != 0 {
hasSame = false
break
}
}
if hasSame {
break
}
}
return hasSame
}

func (ctr *container) removeBatch(proc *process.Process, index int) {
if ctr.batchList[index] != nil {
ctr.batchList[index].Clean(proc.Mp())
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/colexec/partition/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ func newExpression(pos int32, typeID types.T) *plan.Expr {
}

func resetChildren(arg *Partition) {
bat := colexec.MakeMockBatchs()
op := colexec.NewMockOperator().WithBatchs([]*batch.Batch{bat})
bat1 := colexec.MakeMockPartitionBatchs(1)
bat2 := colexec.MakeMockPartitionBatchs(2)
bat3 := colexec.MakeMockPartitionBatchs(3)
op := colexec.NewMockOperator().WithBatchs([]*batch.Batch{bat1, bat2, bat3})
arg.Children = nil
arg.AppendChild(op)
}

0 comments on commit 2cb4a71

Please sign in to comment.