Skip to content

Commit

Permalink
Merge branch 'main' into parition_state_metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jan 9, 2025
2 parents 251978a + fcbf276 commit 537e9be
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 43 deletions.
3 changes: 2 additions & 1 deletion pkg/sql/colexec/projection/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ func (projection *Projection) Call(proc *process.Process) (vm.CallResult, error)

// keep shuffleIDX unchanged
projection.ctr.buf.ShuffleIDX = bat.ShuffleIDX
batches := []*batch.Batch{bat}
for i := range projection.ctr.projExecutors {
vec, err := projection.ctr.projExecutors[i].Eval(proc, []*batch.Batch{bat}, nil)
vec, err := projection.ctr.projExecutors[i].Eval(proc, batches, nil)
if err != nil {
return vm.CancelResult, err
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,13 +565,20 @@ func (s *Scope) getRelData(c *Compile, blockExprList []*plan.Expr) error {
}

if s.NodeInfo.CNCNT == 1 {
rsp := &engine.RangesShuffleParam{
Node: s.DataSource.node,
CNCNT: s.NodeInfo.CNCNT,
CNIDX: s.NodeInfo.CNIDX,
Init: false,
}
s.NodeInfo.Data, err = c.expandRanges(
s.DataSource.node,
rel,
db,
ctx,
blockExprList,
engine.Policy_CollectAllData, nil)
engine.Policy_CollectAllData,
rsp)
if err != nil {
return err
}
Expand Down Expand Up @@ -1012,6 +1019,12 @@ func (s *Scope) buildReaders(c *Compile) (readers []engine.Reader, err error) {
if err != nil {
return
}
for i := range s.DataSource.FilterList {
if plan2.IsFalseExpr(s.DataSource.FilterList[i]) {
emptyScan = true
break
}
}
if !emptyScan {
blockFilterList, err = s.handleBlockFilters(c, runtimeFilterList)
if err != nil {
Expand Down
47 changes: 24 additions & 23 deletions pkg/sql/plan/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ func SimpleInt64HashToRange(i uint64, upperLimit uint64) uint64 {
return hashtable.Int64HashWithFixedSeed(i) % upperLimit
}

func shuffleByZonemap(rsp *engine.RangesShuffleParam, zm objectio.ZoneMap) uint64 {
func shuffleByZonemap(rsp *engine.RangesShuffleParam, zm objectio.ZoneMap, bucketNum int) uint64 {
if !rsp.Init {
rsp.Init = true
switch zm.GetType() {
case types.T_int64, types.T_int32, types.T_int16:
rsp.ShuffleRangeInt64 = ShuffleRangeReEvalSigned(rsp.Node.Stats.HashmapStats.Ranges, int(rsp.CNCNT), rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt))
rsp.ShuffleRangeInt64 = ShuffleRangeReEvalSigned(rsp.Node.Stats.HashmapStats.Ranges, bucketNum, rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt))
case types.T_uint64, types.T_uint32, types.T_uint16, types.T_varchar, types.T_char, types.T_text, types.T_bit, types.T_datalink:
rsp.ShuffleRangeUint64 = ShuffleRangeReEvalUnsigned(rsp.Node.Stats.HashmapStats.Ranges, int(rsp.CNCNT), rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt))
rsp.ShuffleRangeUint64 = ShuffleRangeReEvalUnsigned(rsp.Node.Stats.HashmapStats.Ranges, bucketNum, rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt))
}
}

Expand All @@ -120,20 +120,20 @@ func shuffleByZonemap(rsp *engine.RangesShuffleParam, zm objectio.ZoneMap) uint6
} else if rsp.ShuffleRangeInt64 != nil {
shuffleIDX = GetRangeShuffleIndexForZMSignedSlice(rsp.ShuffleRangeInt64, zm)
} else {
shuffleIDX = GetRangeShuffleIndexForZM(rsp.Node.Stats.HashmapStats.ShuffleColMin, rsp.Node.Stats.HashmapStats.ShuffleColMax, zm, uint64(rsp.CNCNT))
shuffleIDX = GetRangeShuffleIndexForZM(rsp.Node.Stats.HashmapStats.ShuffleColMin, rsp.Node.Stats.HashmapStats.ShuffleColMax, zm, uint64(bucketNum))
}
return shuffleIDX
}

func shuffleByValueExtractedFromZonemap(rsp *engine.RangesShuffleParam, zm objectio.ZoneMap) uint64 {
func shuffleByValueExtractedFromZonemap(rsp *engine.RangesShuffleParam, zm objectio.ZoneMap, bucketNum int) uint64 {
t := types.T(rsp.Node.Stats.HashmapStats.ShuffleColIdx) // actually this is specially used for sort key column type
if !rsp.Init {
rsp.Init = true
switch t {
case types.T_int64, types.T_int32, types.T_int16:
rsp.ShuffleRangeInt64 = ShuffleRangeReEvalSigned(rsp.Node.Stats.HashmapStats.Ranges, int(rsp.CNCNT), rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt))
rsp.ShuffleRangeInt64 = ShuffleRangeReEvalSigned(rsp.Node.Stats.HashmapStats.Ranges, bucketNum, rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt))
case types.T_uint64, types.T_uint32, types.T_uint16, types.T_varchar, types.T_char, types.T_text, types.T_bit, types.T_datalink:
rsp.ShuffleRangeUint64 = ShuffleRangeReEvalUnsigned(rsp.Node.Stats.HashmapStats.Ranges, int(rsp.CNCNT), rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt))
rsp.ShuffleRangeUint64 = ShuffleRangeReEvalUnsigned(rsp.Node.Stats.HashmapStats.Ranges, bucketNum, rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt))
}
}

Expand All @@ -143,11 +143,24 @@ func shuffleByValueExtractedFromZonemap(rsp *engine.RangesShuffleParam, zm objec
} else if rsp.ShuffleRangeInt64 != nil {
shuffleIDX = GetRangeShuffleIndexForValuesExtractedFromZMSignedSlice(rsp.ShuffleRangeInt64, zm, t)
} else {
shuffleIDX = GetRangeShuffleIndexForExtractedZM(rsp.Node.Stats.HashmapStats.ShuffleColMin, rsp.Node.Stats.HashmapStats.ShuffleColMax, zm, uint64(rsp.CNCNT), t)
shuffleIDX = GetRangeShuffleIndexForExtractedZM(rsp.Node.Stats.HashmapStats.ShuffleColMin, rsp.Node.Stats.HashmapStats.ShuffleColMax, zm, uint64(bucketNum), t)
}
return shuffleIDX
}

func CalcRangeShuffleIDXForObj(rsp *engine.RangesShuffleParam, objstats *objectio.ObjectStats, bucketNum int) uint64 {
zm := objstats.SortKeyZoneMap()
if !zm.IsInited() {
// an object with all null will send to shuffleIDX 0
return 0
}
if len(rsp.Node.TableDef.Pkey.Names) == 1 {
return shuffleByZonemap(rsp, zm, bucketNum)
} else {
return shuffleByValueExtractedFromZonemap(rsp, zm, bucketNum)
}
}

func ShouldSkipObjByShuffle(rsp *engine.RangesShuffleParam, objstats *objectio.ObjectStats) bool {
if rsp == nil || rsp.CNCNT <= 1 || rsp.Node == nil {
return false
Expand All @@ -157,24 +170,12 @@ func ShouldSkipObjByShuffle(rsp *engine.RangesShuffleParam, objstats *objectio.O
return !rsp.IsLocalCN
}
if rsp.Node.Stats.HashmapStats.ShuffleType == plan.ShuffleType_Range {
zm := objstats.SortKeyZoneMap()
if !zm.IsInited() {
// an object with all null will send to first CN
return rsp.CNIDX != 0
}
if len(rsp.Node.TableDef.Pkey.Names) == 1 {
shuffleIDX := shuffleByZonemap(rsp, zm)
return shuffleIDX != uint64(rsp.CNIDX)
} else {
shuffleIDX := shuffleByValueExtractedFromZonemap(rsp, zm)
return shuffleIDX != uint64(rsp.CNIDX)
}
//shuffle by range
return CalcRangeShuffleIDXForObj(rsp, objstats, int(rsp.CNCNT)) != uint64(rsp.CNIDX)
}

//shuffle by hash
objID := objstats.ObjectLocation().ObjectId()
index := SimpleCharHashToRange(objID[:], uint64(rsp.CNCNT))
return index != uint64(rsp.CNIDX)
return SimpleCharHashToRange(objID[:], uint64(rsp.CNCNT)) != uint64(rsp.CNIDX)
}

func GetCenterValueForZMSigned(zm objectio.ZoneMap) int64 {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/plan/shuffle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestShuffleByZonemap(t *testing.T) {
CNIDX: 0,
Init: false,
}
shuffleByZonemap(rsp, zm)
shuffleByZonemap(rsp, zm, 2)
}

func TestShuffleByValueExtractedFromZonemap(t *testing.T) {
Expand All @@ -362,7 +362,7 @@ func TestShuffleByValueExtractedFromZonemap(t *testing.T) {
CNIDX: 0,
Init: false,
}
idx := shuffleByValueExtractedFromZonemap(rsp, zm)
idx := shuffleByValueExtractedFromZonemap(rsp, zm, 3)
require.Equal(t, idx, uint64(2))

node = &plan.Node{
Expand Down Expand Up @@ -390,6 +390,6 @@ func TestShuffleByValueExtractedFromZonemap(t *testing.T) {
CNIDX: 0,
Init: false,
}
idx = shuffleByValueExtractedFromZonemap(rsp, zm)
idx = shuffleByValueExtractedFromZonemap(rsp, zm, 4)
require.Equal(t, idx, uint64(1))
}
5 changes: 1 addition & 4 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ func (tbl *txnTable) getObjList(ctx context.Context, rangesParam engine.RangesPa
needUncommited := rangesParam.Policy&engine.Policy_CollectUncommittedData != 0
objRelData := &readutil.ObjListRelData{
NeedFirstEmpty: needUncommited,
Rsp: rangesParam.Rsp,
}

if needUncommited {
Expand Down Expand Up @@ -1812,10 +1813,6 @@ func (tbl *txnTable) BuildReaders(
) ([]engine.Reader, error) {
var rds []engine.Reader
proc := p.(*process.Process)
//copy from NewReader.
if plan2.IsFalseExpr(expr) {
return []engine.Reader{new(readutil.EmptyReader)}, nil
}

if orderBy && num != 1 {
return nil, moerr.NewInternalErrorNoCtx("orderBy only support one reader")
Expand Down
5 changes: 0 additions & 5 deletions pkg/vm/engine/disttae/txn_table_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/shard"
pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo"
"github.com/matrixorigin/matrixone/pkg/shardservice"
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache"
"github.com/matrixorigin/matrixone/pkg/vm/engine/readutil"
Expand Down Expand Up @@ -641,10 +640,6 @@ func (tbl *txnTableDelegate) BuildShardingReaders(
var rds []engine.Reader
proc := p.(*process.Process)

if plan2.IsFalseExpr(expr) {
return []engine.Reader{new(readutil.EmptyReader)}, nil
}

if orderBy && num != 1 {
return nil, moerr.NewInternalErrorNoCtx("orderBy only support one reader")
}
Expand Down
60 changes: 57 additions & 3 deletions pkg/vm/engine/readutil/relation_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package readutil
import (
"bytes"
"fmt"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -215,6 +217,7 @@ type ObjListRelData struct {
expanded bool
TotalBlocks uint32
Objlist []objectio.ObjectStats
Rsp *engine.RangesShuffleParam
blocklistRelData BlockListRelData
}

Expand Down Expand Up @@ -251,9 +254,60 @@ func (or *ObjListRelData) AppendShardID(id uint64) {
panic("not supported")
}

func (or *ObjListRelData) Split(i int) []engine.RelData {
or.expand()
return or.blocklistRelData.Split(i)
func (or *ObjListRelData) Split(cpunum int) []engine.RelData {
rsp := or.Rsp
if len(or.Objlist) < cpunum || or.TotalBlocks < 64 || rsp == nil || !rsp.Node.Stats.HashmapStats.Shuffle || rsp.Node.Stats.HashmapStats.ShuffleType != plan.ShuffleType_Range {
//dont need to range shuffle, just split average
or.expand()
return or.blocklistRelData.Split(cpunum)
}
//split by range shuffle
result := make([]engine.RelData, cpunum)
for i := range result {
result[i] = or.blocklistRelData.BuildEmptyRelData(int(or.TotalBlocks) / cpunum)
}
if or.NeedFirstEmpty {
result[0].AppendBlockInfo(&objectio.EmptyBlockInfo)
}
for i := range or.Objlist {
shuffleIDX := int(plan2.CalcRangeShuffleIDXForObj(rsp, &or.Objlist[i], int(rsp.CNCNT)*cpunum)) % cpunum
blks := objectio.ObjectStatsToBlockInfoSlice(&or.Objlist[i], false)
result[shuffleIDX].AppendBlockInfoSlice(blks)
}
//make result average
for {
maxCnt := result[0].DataCnt()
minCnt := result[0].DataCnt()
maxIdx := 0
minIdx := 0
for i := range result {
if result[i].DataCnt() > maxCnt {
maxCnt = result[i].DataCnt()
maxIdx = i
}
if result[i].DataCnt() < minCnt {
minCnt = result[i].DataCnt()
minIdx = i
}
}
if maxCnt < minCnt*2 {
break
}

diff := (maxCnt-minCnt)/3 + 1
cut_from := result[maxIdx].DataCnt() - diff
result[minIdx].AppendBlockInfoSlice(result[maxIdx].DataSlice(cut_from, maxCnt).GetBlockInfoSlice())
result[maxIdx] = result[maxIdx].DataSlice(0, cut_from)
}
//check total block cnt
totalBlocks := 0
for i := range result {
totalBlocks += result[i].DataCnt()
}
if totalBlocks != int(or.TotalBlocks) {
panic("wrong blocks cnt after objlist reldata split!")
}
return result
}

func (or *ObjListRelData) GetBlockInfoSlice() objectio.BlockInfoSlice {
Expand Down
6 changes: 3 additions & 3 deletions test/distributed/cases/database/system_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ show tables;
# show columns from `user`;
-- @ignore:0
show columns from `db`;
-- @ignore:0
-- @ignore:0,4
show columns from `procs_priv`;
-- @ignore:0
-- @ignore:0,4
show columns from `columns_priv`;
-- @ignore:0
-- @ignore:0,4
show columns from `tables_priv`;
use information_schema;
show tables;
Expand Down
3 changes: 3 additions & 0 deletions test/distributed/cases/optimizer/shuffle.result
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ mo_ctl(dn, flush, d1.t2)
select Sleep(1);
sleep(1)
0
prepare s from select * from t1 where 1=0;
execute s;
c1 c2 c3
explain select count(*) from t1,t2 where t1.c1=t2.c1;
AP QUERY PLAN ON MULTICN(4 core)
Project
Expand Down
2 changes: 2 additions & 0 deletions test/distributed/cases/optimizer/shuffle.test
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ select mo_ctl('dn', 'flush', 'd1.t1');
-- @separator:table
select mo_ctl('dn', 'flush', 'd1.t2');
select Sleep(1);
prepare s from select * from t1 where 1=0;
execute s;
-- @separator:table
explain select count(*) from t1,t2 where t1.c1=t2.c1;
select count(*) from t1,t2 where t1.c1=t2.c1;
Expand Down

0 comments on commit 537e9be

Please sign in to comment.