Skip to content

Commit

Permalink
support shuffle pipeline refactoring for right joins (#21203)
Browse files Browse the repository at this point in the history
support shuffle pipeline refactoring for right joins

Approved by: @ouyuanning
  • Loading branch information
badboynt1 authored Jan 14, 2025
1 parent 7f6aab2 commit 6ca9092
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 30 deletions.
18 changes: 6 additions & 12 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -2140,12 +2140,8 @@ func (c *Compile) compileUnionAll(node *plan.Node, ss []*Scope, children []*Scop
func (c *Compile) compileJoin(node, left, right *plan.Node, probeScopes, buildScopes []*Scope) []*Scope {
if node.Stats.HashmapStats.Shuffle {
if len(c.cnList) == 1 {
if node.JoinType == plan.Node_DEDUP || node.BuildOnLeft {
logutil.Infof("not support shuffle v2 for dedup or right join now")
} else if left.NodeType == plan.Node_JOIN && left.Stats.HashmapStats.Shuffle && left.BuildOnLeft {
logutil.Infof("not support shuffle v2 for right join now")
} else if right.NodeType == plan.Node_JOIN && right.Stats.HashmapStats.Shuffle && right.BuildOnLeft {
logutil.Infof("not support shuffle v2 for right join now")
if node.JoinType == plan.Node_DEDUP {
logutil.Infof("not support shuffle v2 for dedup join now")
} else {
return c.compileShuffleJoinV2(node, left, right, probeScopes, buildScopes)
}
Expand Down Expand Up @@ -2271,6 +2267,9 @@ func constructShuffleJoinOP(c *Compile, shuffleJoins []*Scope, node, left, right
for i := range shuffleJoins {
op := constructRight(node, leftTyps, rightTyps, c.proc)
op.ShuffleIdx = int32(i)
if shuffleV2 {
op.ShuffleIdx = -1
}
op.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag)
shuffleJoins[i].setRootOperator(op)
}
Expand Down Expand Up @@ -3031,12 +3030,7 @@ func (c *Compile) compileShuffleGroupV2(n *plan.Node, inputSS []*Scope, nodes []

func (c *Compile) compileShuffleGroup(n *plan.Node, inputSS []*Scope, nodes []*plan.Node) []*Scope {
if len(c.cnList) == 1 {
child := nodes[n.Children[0]]
if child.NodeType == plan.Node_JOIN && child.Stats.HashmapStats.Shuffle && n.BuildOnLeft {
logutil.Infof("not support shuffle v2 for right join now")
} else {
return c.compileShuffleGroupV2(n, inputSS, nodes)
}
return c.compileShuffleGroupV2(n, inputSS, nodes)
}

if n.Stats.HashmapStats.ShuffleMethod == plan.ShuffleMethod_Reuse {
Expand Down
42 changes: 24 additions & 18 deletions pkg/sql/compile/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,6 @@ func dupOperator(sourceOp vm.Operator, index int, maxParallel int) vm.Operator {
case vm.Right:
t := sourceOp.(*right.RightJoin)
op := right.NewArgument()
if t.Channel == nil {
t.Channel = make(chan *bitmap.Bitmap, maxParallel)
}
op.Channel = t.Channel
op.NumCPU = uint64(maxParallel)
op.IsMerger = (index == 0)
op.Cond = t.Cond
op.Result = t.Result
op.RightTypes = t.RightTypes
Expand All @@ -227,6 +221,14 @@ func dupOperator(sourceOp vm.Operator, index int, maxParallel int) vm.Operator {
op.JoinMapTag = t.JoinMapTag
op.HashOnPK = t.HashOnPK
op.IsShuffle = t.IsShuffle
if !t.IsShuffle {
if t.Channel == nil {
t.Channel = make(chan *bitmap.Bitmap, maxParallel)
}
op.Channel = t.Channel
op.NumCPU = uint64(maxParallel)
op.IsMerger = (index == 0)
}
if t.ShuffleIdx == -1 { // shuffleV2
op.ShuffleIdx = int32(index)
}
Expand All @@ -235,12 +237,6 @@ func dupOperator(sourceOp vm.Operator, index int, maxParallel int) vm.Operator {
case vm.RightSemi:
t := sourceOp.(*rightsemi.RightSemi)
op := rightsemi.NewArgument()
if t.Channel == nil {
t.Channel = make(chan *bitmap.Bitmap, maxParallel)
}
op.Channel = t.Channel
op.NumCPU = uint64(maxParallel)
op.IsMerger = (index == 0)
op.Cond = t.Cond
op.Result = t.Result
op.RightTypes = t.RightTypes
Expand All @@ -249,6 +245,14 @@ func dupOperator(sourceOp vm.Operator, index int, maxParallel int) vm.Operator {
op.JoinMapTag = t.JoinMapTag
op.HashOnPK = t.HashOnPK
op.IsShuffle = t.IsShuffle
if !t.IsShuffle {
if t.Channel == nil {
t.Channel = make(chan *bitmap.Bitmap, maxParallel)
}
op.Channel = t.Channel
op.NumCPU = uint64(maxParallel)
op.IsMerger = (index == 0)
}
if t.ShuffleIdx == -1 { // shuffleV2
op.ShuffleIdx = int32(index)
}
Expand All @@ -257,12 +261,6 @@ func dupOperator(sourceOp vm.Operator, index int, maxParallel int) vm.Operator {
case vm.RightAnti:
t := sourceOp.(*rightanti.RightAnti)
op := rightanti.NewArgument()
if t.Channel == nil {
t.Channel = make(chan *bitmap.Bitmap, maxParallel)
}
op.Channel = t.Channel
op.NumCPU = uint64(maxParallel)
op.IsMerger = (index == 0)
op.Cond = t.Cond
op.Result = t.Result
op.RightTypes = t.RightTypes
Expand All @@ -271,6 +269,14 @@ func dupOperator(sourceOp vm.Operator, index int, maxParallel int) vm.Operator {
op.JoinMapTag = t.JoinMapTag
op.HashOnPK = t.HashOnPK
op.IsShuffle = t.IsShuffle
if !t.IsShuffle {
if t.Channel == nil {
t.Channel = make(chan *bitmap.Bitmap, maxParallel)
}
op.Channel = t.Channel
op.NumCPU = uint64(maxParallel)
op.IsMerger = (index == 0)
}
if t.ShuffleIdx == -1 { // shuffleV2
op.ShuffleIdx = int32(index)
}
Expand Down

0 comments on commit 6ca9092

Please sign in to comment.