Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](mv) Fix sync mv add default select limit wrongly #47717

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation,
// to connection issues such as S3, so it is directly set to null
if (!isReplay) {
// shouldn't do this while holding mvWriteLock
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, true);
mtmvCache = MTMVCache.from(this.getQuerySql(), MTMVPlanUtil.createMTMVContext(this), true,
true);
}
} catch (Throwable e) {
mtmvCache = null;
Expand Down Expand Up @@ -323,7 +324,8 @@ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws Ana
MTMVCache mtmvCache;
try {
// Should new context with ADMIN user
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, false);
mtmvCache = MTMVCache.from(this.getQuerySql(), MTMVPlanUtil.createMTMVContext(this), true,
false);
} finally {
connectionContext.setThreadLocalInfo();
}
Expand Down
16 changes: 11 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.mtmv;

import org.apache.doris.catalog.MTMV;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
Expand Down Expand Up @@ -87,16 +86,23 @@ public StructInfo getStructInfo() {
return structInfo;
}

public static MTMVCache from(MTMV mtmv, ConnectContext connectContext, boolean needCost, boolean needLock) {
/**
* @param defSql the def sql of materialization
* @param connectContext should create new connectContext use MTMVPlanUtil createMTMVContext
* or createBasicMvContext
* @param needCost the plan from def sql should calc cost or not
* @param needLock should skip lock when create mtmv cache
*/
public static MTMVCache from(String defSql, ConnectContext connectContext, boolean needCost, boolean needLock) {
StatementContext mvSqlStatementContext = new StatementContext(connectContext,
new OriginStatement(mtmv.getQuerySql(), 0));
if (needLock) {
new OriginStatement(defSql, 0));
if (!needLock) {
mvSqlStatementContext.setNeedLockTables(false);
}
if (mvSqlStatementContext.getConnectContext().getStatementContext() == null) {
mvSqlStatementContext.getConnectContext().setStatementContext(mvSqlStatementContext);
}
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(mtmv.getQuerySql());
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(defSql);
NereidsPlanner planner = new NereidsPlanner(mvSqlStatementContext);
boolean originalRewriteFlag = connectContext.getSessionVariable().enableMaterializedViewRewrite;
connectContext.getSessionVariable().enableMaterializedViewRewrite = false;
Expand Down
27 changes: 19 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,24 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;

public class MTMVPlanUtil {

public static ConnectContext createMTMVContext(MTMV mtmv) {
ConnectContext ctx = createBasicMvContext(null);
Optional<String> workloadGroup = mtmv.getWorkloadGroup();
if (workloadGroup.isPresent()) {
ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get());
}
// Set db&catalog to be used when creating materialized views to avoid SQL statements not writing the full path
// After https://github.com/apache/doris/pull/36543,
// After 1, this logic is no longer needed. This is to be compatible with older versions
setCatalogAndDb(ctx, mtmv);
return ctx;
}

public static ConnectContext createBasicMvContext(@Nullable ConnectContext parentContext) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setQualifiedUser(Auth.ADMIN_USER);
Expand All @@ -65,18 +79,15 @@ public static ConnectContext createMTMVContext(MTMV mtmv) {
String.join(",", ImmutableSet.of(
"COMPRESSED_MATERIALIZE_AGG", "COMPRESSED_MATERIALIZE_SORT",
RuleType.ADD_DEFAULT_LIMIT.name())));
Optional<String> workloadGroup = mtmv.getWorkloadGroup();
if (workloadGroup.isPresent()) {
ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get());
}
ctx.setStartTime();
// Set db&catalog to be used when creating materialized views to avoid SQL statements not writing the full path
// After https://github.com/apache/doris/pull/36543,
// After 1, this logic is no longer needed. This is to be compatible with older versions
setCatalogAndDb(ctx, mtmv);
if (parentContext != null) {
ctx.changeDefaultCatalog(parentContext.getDefaultCatalog());
ctx.setDatabase(parentContext.getDatabase());
}
return ctx;
}


private static void setCatalogAndDb(ConnectContext ctx, MTMV mtmv) {
EnvInfo envInfo = mtmv.getEnvInfo();
if (envInfo == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
Expand Down Expand Up @@ -256,8 +257,9 @@ private List<MaterializationContext> createSyncMvContexts(OlapTable olapTable,
LOG.warn(String.format("can't parse %s ", createMvSql));
continue;
}
MTMVCache mtmvCache = MaterializedViewUtils.createMTMVCache(querySql.get(),
cascadesContext.getConnectContext());
MTMVCache mtmvCache = MTMVCache.from(querySql.get(),
MTMVPlanUtil.createBasicMvContext(cascadesContext.getConnectContext()), true,
false);
contexts.add(new SyncMaterializationContext(mtmvCache.getLogicalPlan(),
mtmvCache.getOriginalPlan(), olapTable, meta.getIndexId(), indexName,
cascadesContext, mtmvCache.getStatistics()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,14 @@
import org.apache.doris.catalog.constraint.TableIdentifier;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.StructInfoMap;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.analysis.BindRelation;
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.rules.rewrite.EliminateSort;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
Expand All @@ -55,25 +48,20 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
import org.apache.doris.nereids.trees.plans.visitor.NondeterministicFunctionCollector;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.HashMultimap;
Expand Down Expand Up @@ -312,42 +300,6 @@ public static List<Expression> extractNondeterministicFunction(Plan plan) {
return nondeterministicFunctions;
}

/**
* createMTMVCache from querySql
*/
public static MTMVCache createMTMVCache(String querySql, ConnectContext connectContext) {
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(querySql);
StatementContext mvSqlStatementContext = new StatementContext(connectContext,
new OriginStatement(querySql, 0));
mvSqlStatementContext.setNeedLockTables(false);
NereidsPlanner planner = new NereidsPlanner(mvSqlStatementContext);
if (mvSqlStatementContext.getConnectContext().getStatementContext() == null) {
mvSqlStatementContext.getConnectContext().setStatementContext(mvSqlStatementContext);
}
// Can not convert to table sink, because use the same column from different table when self join
// the out slot is wrong
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainCommand.ExplainLevel.ALL_PLAN);
Plan originPlan = planner.getRewrittenPlan();
// Eliminate result sink because sink operator is useless in query rewrite by materialized view
// and the top sort can also be removed
Plan mvPlan = originPlan.accept(new DefaultPlanRewriter<Object>() {
@Override
public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResultSink, Object context) {
return logicalResultSink.child().accept(this, context);
}
}, null);
// Optimize by rules to remove top sort
CascadesContext parentCascadesContext = CascadesContext.initContext(mvSqlStatementContext, mvPlan,
PhysicalProperties.ANY);
mvPlan = MaterializedViewUtils.rewriteByRules(parentCascadesContext, childContext -> {
Rewriter.getCteChildrenRewriter(childContext,
ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT, EliminateSort::new))).execute();
return childContext.getRewritePlan();
}, mvPlan, originPlan);
return new MTMVCache(mvPlan, originPlan, planner.getAnalyzedPlan(),
planner.getCascadesContext().getMemo().getRoot().getStatistics(), null);
}

/**
* Check the query if Contains query operator
* Such sql as following should return true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !query1 --
1 5

-- !query2 --
1 5
2 1

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite ("sql_default_limit") {

String db = context.config.getDbNameByFile(context.file)
sql """use ${db}"""

sql """ DROP TABLE IF EXISTS sql_default_limit_table; """

sql """
create table sql_default_limit_table
(
id1 int,
id2 int,
id3 int,
sale_date date,
sale_amt bigint
)
distributed by hash(id1)
properties("replication_num" = "1");
"""

sql """insert into sql_default_limit_table values(1,1,1,'2020-02-02',1);"""
sql """insert into sql_default_limit_table values(1,1,1,'2020-02-02',1);"""
sql """insert into sql_default_limit_table values(1,1,1,'2020-02-02',1);"""
sql """insert into sql_default_limit_table values(1,1,1,'2020-02-02',1);"""
sql """insert into sql_default_limit_table values(2,1,1,'2020-02-02',1);"""
sql """insert into sql_default_limit_table values(1,1,1,'2020-02-02',1);"""

create_sync_mv(db, "sql_default_limit_table", "test_mv",
"""select id1, sum(sale_amt) from sql_default_limit_table group by id1""")

sql """analyze table sql_default_limit_table with sync;"""
sql """alter table sql_default_limit_table modify column sale_amt set stats ('row_count'='6');"""


sql """set enable_stats=true;"""
sql """set sql_select_limit = 1;"""
mv_rewrite_success("select id1, sum(sale_amt) from sql_default_limit_table group by id1;", "test_mv")
order_qt_query1 """select id1, sum(sale_amt) from sql_default_limit_table group by id1;"""
sql """set sql_select_limit = -1;"""

sql """set default_order_by_limit = 2;"""
mv_rewrite_success("select id1, sum(sale_amt) from sql_default_limit_table group by id1;", "test_mv")
order_qt_query2 """select id1, sum(sale_amt) from sql_default_limit_table group by id1;"""
}