Skip to content

Commit

Permalink
[refactor](neredis)Remove dependency of old planner partition prune c…
Browse files Browse the repository at this point in the history
…ode in DeleteJobCommand (apache#47234)

### What problem does this PR solve?
Remove dependency of old planner partition prune code in
DeleteJobCommand
  • Loading branch information
Jibing-Li authored Jan 22, 2025
1 parent c9cac8f commit 26b5baf
Show file tree
Hide file tree
Showing 5 changed files with 753 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
Expand Down Expand Up @@ -103,8 +104,8 @@ public void processEmptyRelation(QueryState execState) {
/**
* used for Nereids planner
*/
public void process(Database targetDb, OlapTable targetTbl, List<String> partitionNames,
List<Predicate> deleteConditions, QueryState execState) {
public void process(Database targetDb, OlapTable targetTbl, List<Partition> selectedPartitions,
List<Predicate> deleteConditions, QueryState execState, List<String> partitionNames) {
DeleteJob deleteJob = null;
try {
targetTbl.readLock();
Expand All @@ -114,10 +115,11 @@ public void process(Database targetDb, OlapTable targetTbl, List<String> partiti
// just add a comment here to notice.
}
deleteJob = DeleteJob.newBuilder()
.buildWith(new DeleteJob.BuildParams(
.buildWithNereids(new DeleteJob.BuildParams(
targetDb,
targetTbl,
partitionNames,
selectedPartitions,
deleteConditions));

long txnId = deleteJob.beginTxn();
Expand Down
57 changes: 55 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ public static class BuildParams {
private final Collection<String> partitionNames;

private final List<Predicate> deleteConditions;
private final List<Partition> selectedPartitions;

public BuildParams(Database db, OlapTable table,
Collection<String> partitionNames,
Expand All @@ -516,6 +517,18 @@ public BuildParams(Database db, OlapTable table,
this.table = table;
this.partitionNames = partitionNames;
this.deleteConditions = deleteConditions;
this.selectedPartitions = null;
}

public BuildParams(Database db, OlapTable table,
List<String> partitionNames,
List<Partition> selectedPartitions,
List<Predicate> deleteConditions) {
this.db = db;
this.table = table;
this.partitionNames = partitionNames;
this.deleteConditions = deleteConditions;
this.selectedPartitions = selectedPartitions;
}

public OlapTable getTable() {
Expand All @@ -533,14 +546,54 @@ public Database getDb() {
public List<Predicate> getDeleteConditions() {
return deleteConditions;
}

public List<Partition> getSelectedPartitions() {
return selectedPartitions;
}
}

public static class Builder {

public DeleteJob buildWithNereids(BuildParams params) {
boolean noPartitionSpecified = params.getPartitionNames().isEmpty();
List<Partition> partitions = params.getSelectedPartitions();
Map<Long, Short> partitionReplicaNum = partitions.stream()
.collect(Collectors.toMap(
Partition::getId,
partition ->
params.getTable()
.getPartitionInfo()
.getReplicaAllocation(partition.getId())
.getTotalReplicaNum()));
// generate label
String label = DELETE_PREFIX + UUID.randomUUID();
//generate jobId
long jobId = Env.getCurrentEnv().getNextId();
List<String> partitionNames = partitions.stream().map(Partition::getName).collect(Collectors.toList());
List<Long> partitionIds = partitions.stream().map(Partition::getId).collect(Collectors.toList());
DeleteInfo deleteInfo = new DeleteInfo(params.getDb().getId(), params.getTable().getId(),
params.getTable().getName(), getDeleteCondString(params.getDeleteConditions()),
noPartitionSpecified, partitionIds, partitionNames);
DeleteJob deleteJob = ConnectContext.get() != null && ConnectContext.get().isTxnModel()
? new TxnDeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo)
: new DeleteJob(jobId, -1, label, partitionReplicaNum, deleteInfo);
long replicaNum = partitions.stream().mapToLong(Partition::getAllReplicaCount).sum();
deleteJob.setPartitions(partitions);
deleteJob.setDeleteConditions(params.getDeleteConditions());
deleteJob.setTargetDb(params.getDb());
deleteJob.setTargetTbl(params.getTable());
deleteJob.setCountDownLatch(new MarkedCountDownLatch<>((int) replicaNum));
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null) {
deleteJob.setTimeoutS(connectContext.getExecTimeout());
}
return deleteJob;
}

public DeleteJob buildWith(BuildParams params) throws Exception {
boolean noPartitionSpecified = params.getPartitionNames().isEmpty();
List<Partition> partitions = getSelectedPartitions(params.getTable(),
params.getPartitionNames(), params.getDeleteConditions());
List<Partition> partitions = getSelectedPartitions(params.getTable(), params.getPartitionNames(),
params.getDeleteConditions());
Map<Long, Short> partitionReplicaNum = partitions.stream()
.collect(Collectors.toMap(
Partition::getId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,36 @@
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.expression.rules.PartitionPruner;
import org.apache.doris.nereids.rules.expression.rules.PartitionPruner.PartitionTableType;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.InPredicate;
import org.apache.doris.nereids.trees.expressions.IsNull;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
Expand Down Expand Up @@ -75,17 +86,23 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* delete from unique key table.
*/
public class DeleteFromCommand extends Command implements ForwardWithSync, Explainable {
private static final Logger LOG = LogManager.getLogger(DeleteFromCommand.class);

protected final List<String> nameParts;
protected final String tableAlias;
Expand Down Expand Up @@ -195,10 +212,14 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
// just throw exception to fallback until storage support true predicate.
throw new AnalysisException("delete all rows is forbidden temporary.");
}

ArrayList<String> partitionNames = Lists.newArrayList(relation.getPartNames());
List<Partition> selectedPartitions = getSelectedPartitions(olapTable, filter, partitionNames);

Env.getCurrentEnv()
.getDeleteHandler()
.process((Database) scan.getDatabase(), scan.getTable(),
Lists.newArrayList(relation.getPartNames()), predicates, ctx.getState());
selectedPartitions, predicates, ctx.getState(), partitionNames);
}

private void updateSessionVariableForDelete(SessionVariable sessionVariable) {
Expand All @@ -219,6 +240,52 @@ private void updateSessionVariableForDelete(SessionVariable sessionVariable) {
}
}

private List<Partition> getSelectedPartitions(OlapTable olapTable, PhysicalFilter<?> filter,
List<String> partitionNames) {
// For un_partitioned table, return all partitions.
if (olapTable.getPartitionInfo().getType().equals(PartitionType.UNPARTITIONED)) {
return Lists.newArrayList(olapTable.getPartitions());
}
List<Slot> partitionSlots = Lists.newArrayList();
for (Column c : olapTable.getPartitionColumns()) {
Slot partitionSlot = null;
// loop search is faster than build a map
for (Slot slot : filter.getOutput()) {
if (slot.getName().equalsIgnoreCase(c.getName())) {
partitionSlot = slot;
break;
}
}
if (partitionSlot != null) {
partitionSlots.add(partitionSlot);
}
}
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
Map<Long, PartitionItem> idToPartitions = partitionInfo.getIdToItem(false);
Optional<SortedPartitionRanges<Long>> sortedPartitionRanges = Optional.empty();
// User specified partition is not empty.
if (partitionNames != null && !partitionNames.isEmpty()) {
Set<Long> partitionIds = partitionNames.stream()
.map(olapTable::getPartition)
.map(Partition::getId)
.collect(Collectors.toSet());
idToPartitions = idToPartitions.keySet().stream()
.filter(partitionIds::contains)
.collect(Collectors.toMap(Function.identity(), idToPartitions::get));
} else {
Optional<SortedPartitionRanges<?>> sortedPartitionRangesOpt
= Env.getCurrentEnv().getSortedPartitionsCacheManager().get(olapTable);
if (sortedPartitionRangesOpt.isPresent()) {
sortedPartitionRanges = (Optional) sortedPartitionRangesOpt;
}
}
List<Long> prunedPartitions = PartitionPruner.prune(
partitionSlots, filter.getPredicate(), idToPartitions,
CascadesContext.initContext(new StatementContext(), this, PhysicalProperties.ANY),
PartitionTableType.OLAP, sortedPartitionRanges);
return prunedPartitions.stream().map(olapTable::getPartition).collect(Collectors.toList());
}

private void checkColumn(Set<String> tableColumns, SlotReference slotReference, OlapTable table) {
// 0. must slot from table
if (!slotReference.getColumn().isPresent()) {
Expand Down
Loading

0 comments on commit 26b5baf

Please sign in to comment.