diff --git a/pkg/sql/colexec/projection/projection.go b/pkg/sql/colexec/projection/projection.go index 397df7f27b706..7793bbc37d194 100644 --- a/pkg/sql/colexec/projection/projection.go +++ b/pkg/sql/colexec/projection/projection.go @@ -71,8 +71,9 @@ func (projection *Projection) Call(proc *process.Process) (vm.CallResult, error) // keep shuffleIDX unchanged projection.ctr.buf.ShuffleIDX = bat.ShuffleIDX + batches := []*batch.Batch{bat} for i := range projection.ctr.projExecutors { - vec, err := projection.ctr.projExecutors[i].Eval(proc, []*batch.Batch{bat}, nil) + vec, err := projection.ctr.projExecutors[i].Eval(proc, batches, nil) if err != nil { return vm.CancelResult, err } diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 3dded9ddf696c..9fd91196b10fc 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -565,13 +565,20 @@ func (s *Scope) getRelData(c *Compile, blockExprList []*plan.Expr) error { } if s.NodeInfo.CNCNT == 1 { + rsp := &engine.RangesShuffleParam{ + Node: s.DataSource.node, + CNCNT: s.NodeInfo.CNCNT, + CNIDX: s.NodeInfo.CNIDX, + Init: false, + } s.NodeInfo.Data, err = c.expandRanges( s.DataSource.node, rel, db, ctx, blockExprList, - engine.Policy_CollectAllData, nil) + engine.Policy_CollectAllData, + rsp) if err != nil { return err } @@ -1012,6 +1019,12 @@ func (s *Scope) buildReaders(c *Compile) (readers []engine.Reader, err error) { if err != nil { return } + for i := range s.DataSource.FilterList { + if plan2.IsFalseExpr(s.DataSource.FilterList[i]) { + emptyScan = true + break + } + } if !emptyScan { blockFilterList, err = s.handleBlockFilters(c, runtimeFilterList) if err != nil { diff --git a/pkg/sql/plan/shuffle.go b/pkg/sql/plan/shuffle.go index eb89ea5c13b4d..a54c906f03b15 100644 --- a/pkg/sql/plan/shuffle.go +++ b/pkg/sql/plan/shuffle.go @@ -103,14 +103,14 @@ func SimpleInt64HashToRange(i uint64, upperLimit uint64) uint64 { return hashtable.Int64HashWithFixedSeed(i) % upperLimit } -func shuffleByZonemap(rsp *engine.RangesShuffleParam, zm objectio.ZoneMap) uint64 { +func shuffleByZonemap(rsp *engine.RangesShuffleParam, zm objectio.ZoneMap, bucketNum int) uint64 { if !rsp.Init { rsp.Init = true switch zm.GetType() { case types.T_int64, types.T_int32, types.T_int16: - rsp.ShuffleRangeInt64 = ShuffleRangeReEvalSigned(rsp.Node.Stats.HashmapStats.Ranges, int(rsp.CNCNT), rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt)) + rsp.ShuffleRangeInt64 = ShuffleRangeReEvalSigned(rsp.Node.Stats.HashmapStats.Ranges, bucketNum, rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt)) case types.T_uint64, types.T_uint32, types.T_uint16, types.T_varchar, types.T_char, types.T_text, types.T_bit, types.T_datalink: - rsp.ShuffleRangeUint64 = ShuffleRangeReEvalUnsigned(rsp.Node.Stats.HashmapStats.Ranges, int(rsp.CNCNT), rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt)) + rsp.ShuffleRangeUint64 = ShuffleRangeReEvalUnsigned(rsp.Node.Stats.HashmapStats.Ranges, bucketNum, rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt)) } } @@ -120,20 +120,20 @@ func shuffleByZonemap(rsp *engine.RangesShuffleParam, zm objectio.ZoneMap) uint6 } else if rsp.ShuffleRangeInt64 != nil { shuffleIDX = GetRangeShuffleIndexForZMSignedSlice(rsp.ShuffleRangeInt64, zm) } else { - shuffleIDX = GetRangeShuffleIndexForZM(rsp.Node.Stats.HashmapStats.ShuffleColMin, rsp.Node.Stats.HashmapStats.ShuffleColMax, zm, uint64(rsp.CNCNT)) + shuffleIDX = GetRangeShuffleIndexForZM(rsp.Node.Stats.HashmapStats.ShuffleColMin, rsp.Node.Stats.HashmapStats.ShuffleColMax, zm, uint64(bucketNum)) } return shuffleIDX } -func shuffleByValueExtractedFromZonemap(rsp *engine.RangesShuffleParam, zm objectio.ZoneMap) uint64 { +func shuffleByValueExtractedFromZonemap(rsp *engine.RangesShuffleParam, zm objectio.ZoneMap, bucketNum int) uint64 { t := types.T(rsp.Node.Stats.HashmapStats.ShuffleColIdx) // actually this is specially used for sort key column type if !rsp.Init { rsp.Init = true switch t { case types.T_int64, types.T_int32, types.T_int16: - rsp.ShuffleRangeInt64 = ShuffleRangeReEvalSigned(rsp.Node.Stats.HashmapStats.Ranges, int(rsp.CNCNT), rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt)) + rsp.ShuffleRangeInt64 = ShuffleRangeReEvalSigned(rsp.Node.Stats.HashmapStats.Ranges, bucketNum, rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt)) case types.T_uint64, types.T_uint32, types.T_uint16, types.T_varchar, types.T_char, types.T_text, types.T_bit, types.T_datalink: - rsp.ShuffleRangeUint64 = ShuffleRangeReEvalUnsigned(rsp.Node.Stats.HashmapStats.Ranges, int(rsp.CNCNT), rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt)) + rsp.ShuffleRangeUint64 = ShuffleRangeReEvalUnsigned(rsp.Node.Stats.HashmapStats.Ranges, bucketNum, rsp.Node.Stats.HashmapStats.Nullcnt, int64(rsp.Node.Stats.TableCnt)) } } @@ -143,11 +143,24 @@ func shuffleByValueExtractedFromZonemap(rsp *engine.RangesShuffleParam, zm objec } else if rsp.ShuffleRangeInt64 != nil { shuffleIDX = GetRangeShuffleIndexForValuesExtractedFromZMSignedSlice(rsp.ShuffleRangeInt64, zm, t) } else { - shuffleIDX = GetRangeShuffleIndexForExtractedZM(rsp.Node.Stats.HashmapStats.ShuffleColMin, rsp.Node.Stats.HashmapStats.ShuffleColMax, zm, uint64(rsp.CNCNT), t) + shuffleIDX = GetRangeShuffleIndexForExtractedZM(rsp.Node.Stats.HashmapStats.ShuffleColMin, rsp.Node.Stats.HashmapStats.ShuffleColMax, zm, uint64(bucketNum), t) } return shuffleIDX } +func CalcRangeShuffleIDXForObj(rsp *engine.RangesShuffleParam, objstats *objectio.ObjectStats, bucketNum int) uint64 { + zm := objstats.SortKeyZoneMap() + if !zm.IsInited() { + // an object with all null will send to shuffleIDX 0 + return 0 + } + if len(rsp.Node.TableDef.Pkey.Names) == 1 { + return shuffleByZonemap(rsp, zm, bucketNum) + } else { + return shuffleByValueExtractedFromZonemap(rsp, zm, bucketNum) + } +} + func ShouldSkipObjByShuffle(rsp *engine.RangesShuffleParam, objstats *objectio.ObjectStats) bool { if rsp == nil || rsp.CNCNT <= 1 || rsp.Node == nil { return false @@ -157,24 +170,12 @@ func ShouldSkipObjByShuffle(rsp *engine.RangesShuffleParam, objstats *objectio.O return !rsp.IsLocalCN } if rsp.Node.Stats.HashmapStats.ShuffleType == plan.ShuffleType_Range { - zm := objstats.SortKeyZoneMap() - if !zm.IsInited() { - // an object with all null will send to first CN - return rsp.CNIDX != 0 - } - if len(rsp.Node.TableDef.Pkey.Names) == 1 { - shuffleIDX := shuffleByZonemap(rsp, zm) - return shuffleIDX != uint64(rsp.CNIDX) - } else { - shuffleIDX := shuffleByValueExtractedFromZonemap(rsp, zm) - return shuffleIDX != uint64(rsp.CNIDX) - } + //shuffle by range + return CalcRangeShuffleIDXForObj(rsp, objstats, int(rsp.CNCNT)) != uint64(rsp.CNIDX) } - //shuffle by hash objID := objstats.ObjectLocation().ObjectId() - index := SimpleCharHashToRange(objID[:], uint64(rsp.CNCNT)) - return index != uint64(rsp.CNIDX) + return SimpleCharHashToRange(objID[:], uint64(rsp.CNCNT)) != uint64(rsp.CNIDX) } func GetCenterValueForZMSigned(zm objectio.ZoneMap) int64 { diff --git a/pkg/sql/plan/shuffle_test.go b/pkg/sql/plan/shuffle_test.go index 9c2a532ea4c96..abc873694725d 100644 --- a/pkg/sql/plan/shuffle_test.go +++ b/pkg/sql/plan/shuffle_test.go @@ -337,7 +337,7 @@ func TestShuffleByZonemap(t *testing.T) { CNIDX: 0, Init: false, } - shuffleByZonemap(rsp, zm) + shuffleByZonemap(rsp, zm, 2) } func TestShuffleByValueExtractedFromZonemap(t *testing.T) { @@ -362,7 +362,7 @@ func TestShuffleByValueExtractedFromZonemap(t *testing.T) { CNIDX: 0, Init: false, } - idx := shuffleByValueExtractedFromZonemap(rsp, zm) + idx := shuffleByValueExtractedFromZonemap(rsp, zm, 3) require.Equal(t, idx, uint64(2)) node = &plan.Node{ @@ -390,6 +390,6 @@ func TestShuffleByValueExtractedFromZonemap(t *testing.T) { CNIDX: 0, Init: false, } - idx = shuffleByValueExtractedFromZonemap(rsp, zm) + idx = shuffleByValueExtractedFromZonemap(rsp, zm, 4) require.Equal(t, idx, uint64(1)) } diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index cce1cf9c74ca4..1cdb62edcaeda 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -588,6 +588,7 @@ func (tbl *txnTable) getObjList(ctx context.Context, rangesParam engine.RangesPa needUncommited := rangesParam.Policy&engine.Policy_CollectUncommittedData != 0 objRelData := &readutil.ObjListRelData{ NeedFirstEmpty: needUncommited, + Rsp: rangesParam.Rsp, } if needUncommited { @@ -1812,10 +1813,6 @@ func (tbl *txnTable) BuildReaders( ) ([]engine.Reader, error) { var rds []engine.Reader proc := p.(*process.Process) - //copy from NewReader. - if plan2.IsFalseExpr(expr) { - return []engine.Reader{new(readutil.EmptyReader)}, nil - } if orderBy && num != 1 { return nil, moerr.NewInternalErrorNoCtx("orderBy only support one reader") diff --git a/pkg/vm/engine/disttae/txn_table_sharding.go b/pkg/vm/engine/disttae/txn_table_sharding.go index 1b7e3da2fbc78..312568b827552 100644 --- a/pkg/vm/engine/disttae/txn_table_sharding.go +++ b/pkg/vm/engine/disttae/txn_table_sharding.go @@ -34,7 +34,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/shard" pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" "github.com/matrixorigin/matrixone/pkg/shardservice" - plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache" "github.com/matrixorigin/matrixone/pkg/vm/engine/readutil" @@ -641,10 +640,6 @@ func (tbl *txnTableDelegate) BuildShardingReaders( var rds []engine.Reader proc := p.(*process.Process) - if plan2.IsFalseExpr(expr) { - return []engine.Reader{new(readutil.EmptyReader)}, nil - } - if orderBy && num != 1 { return nil, moerr.NewInternalErrorNoCtx("orderBy only support one reader") } diff --git a/pkg/vm/engine/readutil/relation_data.go b/pkg/vm/engine/readutil/relation_data.go index 73a0b7bf94484..4edc8b57680b7 100644 --- a/pkg/vm/engine/readutil/relation_data.go +++ b/pkg/vm/engine/readutil/relation_data.go @@ -17,6 +17,8 @@ package readutil import ( "bytes" "fmt" + "github.com/matrixorigin/matrixone/pkg/pb/plan" + plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -215,6 +217,7 @@ type ObjListRelData struct { expanded bool TotalBlocks uint32 Objlist []objectio.ObjectStats + Rsp *engine.RangesShuffleParam blocklistRelData BlockListRelData } @@ -251,9 +254,60 @@ func (or *ObjListRelData) AppendShardID(id uint64) { panic("not supported") } -func (or *ObjListRelData) Split(i int) []engine.RelData { - or.expand() - return or.blocklistRelData.Split(i) +func (or *ObjListRelData) Split(cpunum int) []engine.RelData { + rsp := or.Rsp + if len(or.Objlist) < cpunum || or.TotalBlocks < 64 || rsp == nil || !rsp.Node.Stats.HashmapStats.Shuffle || rsp.Node.Stats.HashmapStats.ShuffleType != plan.ShuffleType_Range { + //dont need to range shuffle, just split average + or.expand() + return or.blocklistRelData.Split(cpunum) + } + //split by range shuffle + result := make([]engine.RelData, cpunum) + for i := range result { + result[i] = or.blocklistRelData.BuildEmptyRelData(int(or.TotalBlocks) / cpunum) + } + if or.NeedFirstEmpty { + result[0].AppendBlockInfo(&objectio.EmptyBlockInfo) + } + for i := range or.Objlist { + shuffleIDX := int(plan2.CalcRangeShuffleIDXForObj(rsp, &or.Objlist[i], int(rsp.CNCNT)*cpunum)) % cpunum + blks := objectio.ObjectStatsToBlockInfoSlice(&or.Objlist[i], false) + result[shuffleIDX].AppendBlockInfoSlice(blks) + } + //make result average + for { + maxCnt := result[0].DataCnt() + minCnt := result[0].DataCnt() + maxIdx := 0 + minIdx := 0 + for i := range result { + if result[i].DataCnt() > maxCnt { + maxCnt = result[i].DataCnt() + maxIdx = i + } + if result[i].DataCnt() < minCnt { + minCnt = result[i].DataCnt() + minIdx = i + } + } + if maxCnt < minCnt*2 { + break + } + + diff := (maxCnt-minCnt)/3 + 1 + cut_from := result[maxIdx].DataCnt() - diff + result[minIdx].AppendBlockInfoSlice(result[maxIdx].DataSlice(cut_from, maxCnt).GetBlockInfoSlice()) + result[maxIdx] = result[maxIdx].DataSlice(0, cut_from) + } + //check total block cnt + totalBlocks := 0 + for i := range result { + totalBlocks += result[i].DataCnt() + } + if totalBlocks != int(or.TotalBlocks) { + panic("wrong blocks cnt after objlist reldata split!") + } + return result } func (or *ObjListRelData) GetBlockInfoSlice() objectio.BlockInfoSlice { diff --git a/test/distributed/cases/database/system_table.sql b/test/distributed/cases/database/system_table.sql index c08e9ecea7890..1959e8837847c 100644 --- a/test/distributed/cases/database/system_table.sql +++ b/test/distributed/cases/database/system_table.sql @@ -20,11 +20,11 @@ show tables; # show columns from `user`; -- @ignore:0 show columns from `db`; --- @ignore:0 +-- @ignore:0,4 show columns from `procs_priv`; --- @ignore:0 +-- @ignore:0,4 show columns from `columns_priv`; --- @ignore:0 +-- @ignore:0,4 show columns from `tables_priv`; use information_schema; show tables; diff --git a/test/distributed/cases/optimizer/shuffle.result b/test/distributed/cases/optimizer/shuffle.result index 37fda2a965914..7a941ba5ee302 100644 --- a/test/distributed/cases/optimizer/shuffle.result +++ b/test/distributed/cases/optimizer/shuffle.result @@ -16,6 +16,9 @@ mo_ctl(dn, flush, d1.t2) select Sleep(1); sleep(1) 0 +prepare s from select * from t1 where 1=0; +execute s; +c1 c2 c3 explain select count(*) from t1,t2 where t1.c1=t2.c1; AP QUERY PLAN ON MULTICN(4 core) Project diff --git a/test/distributed/cases/optimizer/shuffle.test b/test/distributed/cases/optimizer/shuffle.test index 4245f69cd7f02..a37d9cd306232 100644 --- a/test/distributed/cases/optimizer/shuffle.test +++ b/test/distributed/cases/optimizer/shuffle.test @@ -12,6 +12,8 @@ select mo_ctl('dn', 'flush', 'd1.t1'); -- @separator:table select mo_ctl('dn', 'flush', 'd1.t2'); select Sleep(1); +prepare s from select * from t1 where 1=0; +execute s; -- @separator:table explain select count(*) from t1,t2 where t1.c1=t2.c1; select count(*) from t1,t2 where t1.c1=t2.c1;