Skip to content

Commit

Permalink
[Fix](nereids) Fix get ralated partition table when nodata (apache#29453
Browse files Browse the repository at this point in the history
)

Support to create partition materialized view using nodata table
Such as the table def as following:
>        CREATE TABLE `test_no_data` (
>         `user_id` LARGEINT NOT NULL COMMENT '"用户id"',
>         `date` DATE NOT NULL COMMENT '"数据灌入日期时间"',
>         `num` SMALLINT NOT NULL COMMENT '"数量"'
>        ) ENGINE=OLAP
>        DUPLICATE KEY(`user_id`, `date`, `num`)
>        COMMENT 'OLAP'
>        PARTITION BY RANGE(`date`)
>        (PARTITION p201701_1000 VALUES [('0000-01-01'), ('2017-02-01')),
>        PARTITION p201702_2000 VALUES [('2017-02-01'), ('2017-03-01')),
>        PARTITION p201703_all VALUES [('2017-03-01'), ('2017-04-01')))
>        DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
>        PROPERTIES ('replication_num' = '1') ;

when table test_no_data has no data, it also support to create partition materialized view as following:
>        CREATE MATERIALIZED VIEW no_data_partition_mv
>            BUILD IMMEDIATE REFRESH AUTO ON MANUAL
>            partition by(`date`)
>            DISTRIBUTED BY RANDOM BUCKETS 2
>            PROPERTIES ('replication_num' = '1')
>            AS
>           SELECT * FROM test_no_data where date > '2017-05-01';
>
  • Loading branch information
seawinde committed Jan 24, 2024
1 parent b702bac commit 48dc058
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class MTMVCache {
// this should be shuttle expression with lineage
private final List<NamedExpression> mvOutputExpressions;

public MTMVCache(MTMV materializedView, Plan logicalPlan, List<NamedExpression> mvOutputExpressions) {
public MTMVCache(Plan logicalPlan, List<NamedExpression> mvOutputExpressions) {
this.logicalPlan = logicalPlan;
this.mvOutputExpressions = mvOutputExpressions;
}
Expand All @@ -56,11 +56,6 @@ public List<NamedExpression> getMvOutputExpressions() {
return mvOutputExpressions;
}

public MTMVCache(Plan logicalPlan, List<NamedExpression> mvOutputExpressions) {
this.logicalPlan = logicalPlan;
this.mvOutputExpressions = mvOutputExpressions;
}

public static MTMVCache from(MTMV mtmv, ConnectContext connectContext) {
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(mtmv.getQuerySql());
// this will be removed in the future when support join derivation
Expand Down
18 changes: 17 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.Sets;

Expand All @@ -61,7 +63,21 @@ public static ConnectContext createMTMVContext(MTMV mtmv) throws AnalysisExcepti
}

public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext ctx) {
Plan plan = getPlanBySql(mtmv.getQuerySql(), ctx);
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
SessionVariable sessionVariable = ctx.getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
if (ctx.getStatementContext() != null) {
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Plan plan;
try {
plan = getPlanBySql(mtmv.getQuerySql(), ctx);
} finally {
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
return generateMTMVRelation(plan);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ public synchronized <T> T getOrRegisterCache(String key, Supplier<T> cacheSuppli
return supplier.get();
}

/**
* Some value of the cacheKey may change, invalid cache when value change
*/
public synchronized void invalidCache(String cacheKey) {
contextCacheMap.remove(cacheKey);
}

public ColumnAliasGenerator getColumnAliasGenerator() {
return columnAliasGenerator == null
? columnAliasGenerator = new ColumnAliasGenerator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ protected void countJobExecutionTimesOfGroupExpressions(GroupExpression groupExp

public static Set<Integer> getDisableRules(JobContext context) {
return context.getCascadesContext().getAndCacheSessionVariable(
"disableNereidsRules", ImmutableSet.of(), SessionVariable::getDisableNereidsRules);
SessionVariable.DISABLE_NEREIDS_RULES, ImmutableSet.of(), SessionVariable::getDisableNereidsRules);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
Expand All @@ -63,6 +65,7 @@
import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -85,7 +88,7 @@
*/
public class CreateMTMVInfo {
public static final Logger LOG = LogManager.getLogger(CreateMTMVInfo.class);

public static final String MTMV_PLANER_DISABLE_RULES = "OLAP_SCAN_PARTITION_PRUNE";
private final boolean ifNotExists;
private final TableNameInfo mvName;
private List<String> keys;
Expand Down Expand Up @@ -206,7 +209,8 @@ private void analyzeProperties() {
*/
public void analyzeQuery(ConnectContext ctx) {
// create table as select
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
StatementContext statementContext = ctx.getStatementContext();
NereidsPlanner planner = new NereidsPlanner(statementContext);
// this is for expression column name infer when not use alias
LogicalSink<Plan> logicalSink = new UnboundResultSink<>(logicalQuery);
Plan plan = planner.plan(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
Expand All @@ -228,44 +232,73 @@ public void analyzeQuery(ConnectContext ctx) {
}

private void getRelation(NereidsPlanner planner) {
Plan plan = planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.NONE);
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
ConnectContext ctx = planner.getCascadesContext().getConnectContext();
SessionVariable sessionVariable = ctx.getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
if (ctx.getStatementContext() != null) {
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Plan plan;
try {
plan = planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.NONE);
} finally {
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
this.relation = MTMVPlanUtil.generateMTMVRelation(plan);
}

private void analyzePartition(NereidsPlanner planner) {
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
Plan mvRewrittenPlan =
planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
Optional<RelatedTableInfo> relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(mvPartitionInfo.getPartitionCol(), mvRewrittenPlan);
if (!relatedTableInfo.isPresent() || !relatedTableInfo.get().isPctPossible()) {
throw new AnalysisException("Unable to find a suitable base table for partitioning");
}
TableIf followTable = null;
try {
followTable = MTMVUtil.getTable(relatedTableInfo.get().getTableInfo());
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException(e.getMessage(), e);
}
if (!(followTable instanceof OlapTable)) {
throw new AnalysisException("base table for partitioning only can be OlapTable.");
}
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);

CascadesContext cascadesContext = planner.getCascadesContext();
SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
sessionVariable.setDisableNereidsRules(MTMV_PLANER_DISABLE_RULES);
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
try {
partitionColumnNames.addAll(((OlapTable) followTable).getPartitionColumnNames());
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}
Plan mvRewrittenPlan =
planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
Optional<RelatedTableInfo> relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(mvPartitionInfo.getPartitionCol(), mvRewrittenPlan);
if (!relatedTableInfo.isPresent() || !relatedTableInfo.get().isPctPossible()) {
throw new AnalysisException("Unable to find a suitable base table for partitioning");
}
TableIf followTable = null;
try {
followTable = MTMVUtil.getTable(relatedTableInfo.get().getTableInfo());
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException(e.getMessage(), e);
}
if (!(followTable instanceof OlapTable)) {
throw new AnalysisException("base table for partitioning only can be OlapTable.");
}
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
partitionColumnNames.addAll(((OlapTable) followTable).getPartitionColumnNames());
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}

if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.get().getColumn());
}
if (partitionColumnNames.size() != 1) {
throw new AnalysisException("base table for partitioning only support single column.");
if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.get().getColumn());
}
if (partitionColumnNames.size() != 1) {
throw new AnalysisException("base table for partitioning only support single column.");
}
mvPartitionInfo.setRelatedTable(relatedTableInfo.get().getTableInfo());
mvPartitionInfo.setRelatedCol(relatedTableInfo.get().getColumn());
partitionDesc = generatePartitionDesc((OlapTable) followTable);
} finally {
// after operate, roll back the disable rules
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
mvPartitionInfo.setRelatedTable(relatedTableInfo.get().getTableInfo());
mvPartitionInfo.setRelatedCol(relatedTableInfo.get().getColumn());
partitionDesc = generatePartitionDesc((OlapTable) followTable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,55 @@ protected void runBeforeAll() throws Exception {
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\"\n"
+ ")");

createTable("CREATE TABLE `lineitem_no_data` (\n"
+ " `l_orderkey` BIGINT NOT NULL,\n"
+ " `l_linenumber` INT NOT NULL,\n"
+ " `l_partkey` INT NOT NULL,\n"
+ " `l_suppkey` INT NOT NULL,\n"
+ " `l_quantity` DECIMAL(15, 2) NOT NULL,\n"
+ " `l_extendedprice` DECIMAL(15, 2) NOT NULL,\n"
+ " `l_discount` DECIMAL(15, 2) NOT NULL,\n"
+ " `l_tax` DECIMAL(15, 2) NOT NULL,\n"
+ " `l_returnflag` VARCHAR(1) NOT NULL,\n"
+ " `l_linestatus` VARCHAR(1) NOT NULL,\n"
+ " `l_commitdate` DATE NOT NULL,\n"
+ " `l_receiptdate` DATE NOT NULL,\n"
+ " `l_shipinstruct` VARCHAR(25) NOT NULL,\n"
+ " `l_shipmode` VARCHAR(10) NOT NULL,\n"
+ " `l_comment` VARCHAR(44) NOT NULL,\n"
+ " `l_shipdate` DATE NOT NULL\n"
+ " ) ENGINE=OLAP\n"
+ " DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey )\n"
+ " COMMENT 'OLAP'\n"
+ " AUTO PARTITION BY range date_trunc(`l_shipdate`, 'day') ()\n"
+ " DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96\n"
+ " PROPERTIES (\n"
+ " \"replication_num\" = \"1\"\n"
+ " );\n"
+ "\n");
// Should not make scan to empty relation when the table used by materialized view has no data
connectContext.getSessionVariable().setDisableNereidsRules("OLAP_SCAN_PARTITION_PRUNE");
}

@Test
public void getRelatedTableInfoWhenAutoPartitionTest() {
PlanChecker.from(connectContext)
.checkExplain("select * from "
+ "(select * from lineitem_no_data "
+ "where l_shipdate >= \"2023-12-01\" and l_shipdate <= \"2023-12-03\") t1 "
+ "left join "
+ "(select * from orders where o_orderdate >= \"2023-12-01\" and o_orderdate <= \"2023-12-03\" ) t2 "
+ "on t1.l_orderkey = o_orderkey;",
nereidsPlanner -> {
Plan rewrittenPlan = nereidsPlanner.getRewrittenPlan();
Optional<RelatedTableInfo> relatedTableInfo =
MaterializedViewUtils.getRelatedTableInfo("l_shipdate", rewrittenPlan);
checkRelatedTableInfo(relatedTableInfo,
"lineitem_no_data",
"L_SHIPDATE",
true);
});
}

@Test
Expand Down
30 changes: 30 additions & 0 deletions regression-test/suites/mtmv_p0/test_partition_refresh_mtmv.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -400,4 +400,34 @@ suite("test_partition_refresh_mtmv") {
waitingMTMVTaskFinished(jobName)
order_qt_exclude_will_change "SELECT * FROM ${mvName} order by user_id,age,date,num"
order_qt_change_status "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'"


// test create partitioned materialized view using no data table
sql """drop table if exists test_no_data;"""
sql """
CREATE TABLE `test_no_data` (
`user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
`date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"',
`num` SMALLINT NOT NULL COMMENT '\"数量\"'
) ENGINE=OLAP
DUPLICATE KEY(`user_id`, `date`, `num`)
COMMENT 'OLAP'
PARTITION BY RANGE(`date`)
(PARTITION p201701_1000 VALUES [('0000-01-01'), ('2017-02-01')),
PARTITION p201702_2000 VALUES [('2017-02-01'), ('2017-03-01')),
PARTITION p201703_all VALUES [('2017-03-01'), ('2017-04-01')))
DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
PROPERTIES ('replication_num' = '1') ;
"""

sql """DROP MATERIALIZED VIEW IF EXISTS no_data_partition_mv;"""
sql """
CREATE MATERIALIZED VIEW no_data_partition_mv
BUILD IMMEDIATE REFRESH AUTO ON MANUAL
partition by(`date`)
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS
SELECT * FROM test_no_data where date > '2017-05-01';
"""
}

0 comments on commit 48dc058

Please sign in to comment.