Skip to content

Commit

Permalink
[enhancement](Nereids) add interface for binary search filtering prun…
Browse files Browse the repository at this point in the history
…e external table's partitions (#47781)

1. add interface `SupportBinarySearchFilteringPartitions` for binary
search filtering prune external table's partitions
2. skip binary search if the recent changed lower than 10s
  • Loading branch information
924060929 authored Feb 14, 2025
1 parent 51189e7 commit 3be57eb
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 24 deletions.
19 changes: 18 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.doris.mtmv.MTMVVersionSnapshot;
import org.apache.doris.nereids.hint.Hint;
import org.apache.doris.nereids.hint.UseMvHint;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
Expand Down Expand Up @@ -128,9 +129,25 @@
* Internal representation of tableFamilyGroup-related metadata. A OlaptableFamilyGroup contains several tableFamily.
* Note: when you add a new olap table property, you should modify TableProperty class
*/
public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProcessable {
public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProcessable,
SupportBinarySearchFilteringPartitions {
private static final Logger LOG = LogManager.getLogger(OlapTable.class);

@Override
public Map<Long, PartitionItem> getOriginPartitions(CatalogRelation scan) {
return getPartitionInfo().getIdToItem(false);
}

@Override
public Object getPartitionMetaVersion(CatalogRelation scan) {
return getVisibleVersion();
}

@Override
public long getPartitionMetaLoadTimeMillis(CatalogRelation scan) {
return getVisibleVersionTime();
}

public enum OlapTableState {
NORMAL,
ROLLUP,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog;

import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;

import java.util.Map;

/**
* SupportBinarySearchFilteringPartitions: this interface is used to support binary search filtering partitions
*/
public interface SupportBinarySearchFilteringPartitions extends TableIf {
/**
* get the origin partition info which maybe not sorted, the NereidsSortedPartitionsCacheManager will
* sort this partitions and cache in frontend. you can save the partition's meta snapshot id in the
* CatalogRelation and get the partitions by the snapshot id.
*/
Map<?, PartitionItem> getOriginPartitions(CatalogRelation scan);

/**
* return the version of the partitions meta, if the version changed, we should skip the legacy sorted
* partitions and reload it.
*/
Object getPartitionMetaVersion(CatalogRelation scan);

/**
* when the partition meta loaded? if the partition meta load too frequently, we will skip sort partitions meta
* and will not use binary search to filtering partitions
*/
long getPartitionMetaLoadTimeMillis(CatalogRelation scan);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.SupportBinarySearchFilteringPartitions;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
import org.apache.doris.datasource.CatalogIf;
Expand All @@ -30,6 +29,7 @@
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndId;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndRange;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
Expand All @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;

/**
Expand All @@ -62,7 +63,8 @@ public NereidsSortedPartitionsCacheManager() {
);
}

public Optional<SortedPartitionRanges<?>> get(OlapTable table) {
public Optional<SortedPartitionRanges<?>> get(
SupportBinarySearchFilteringPartitions table, CatalogRelation scan) {
DatabaseIf<?> database = table.getDatabase();
if (database == null) {
return Optional.empty();
Expand All @@ -75,25 +77,33 @@ public Optional<SortedPartitionRanges<?>> get(OlapTable table) {
catalog.getName(), database.getFullName(), table.getName());
PartitionCacheContext partitionCacheContext = partitionCaches.getIfPresent(key);
if (partitionCacheContext == null) {
return Optional.of(loadCache(key, table));
return Optional.ofNullable(loadCache(key, table, scan));
}
if (table.getId() != partitionCacheContext.tableId
|| table.getVisibleVersion() != partitionCacheContext.tableVersion) {
|| !Objects.equals(table.getPartitionMetaVersion(scan), partitionCacheContext.partitionMetaVersion)) {
partitionCaches.invalidate(key);
return Optional.of(loadCache(key, table));
return Optional.ofNullable(loadCache(key, table, scan));
}
return Optional.of(partitionCacheContext.sortedPartitionRanges);
}

private SortedPartitionRanges<?> loadCache(TableIdentifier key, OlapTable olapTable) {
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
Map<Long, PartitionItem> allPartitions = partitionInfo.getIdToItem(false);
List<Entry<Long, PartitionItem>> sortedList = Lists.newArrayList(allPartitions.entrySet());
List<PartitionItemAndRange<?>> sortedRanges = Lists.newArrayListWithCapacity(allPartitions.size());
private SortedPartitionRanges<?> loadCache(
TableIdentifier key, SupportBinarySearchFilteringPartitions table, CatalogRelation scan) {
long now = System.currentTimeMillis();
long partitionMetaLoadTime = table.getPartitionMetaLoadTimeMillis(scan);

// if insert too frequently, we will skip sort partitions
if (now <= partitionMetaLoadTime || (now - partitionMetaLoadTime) <= (10 * 1000)) {
return null;
}

Map<?, PartitionItem> unsortedMap = table.getOriginPartitions(scan);
List<Entry<?, PartitionItem>> unsortedList = Lists.newArrayList(unsortedMap.entrySet());
List<PartitionItemAndRange<?>> sortedRanges = Lists.newArrayListWithCapacity(unsortedMap.size());
List<PartitionItemAndId<?>> defaultPartitions = Lists.newArrayList();
for (Entry<Long, PartitionItem> entry : sortedList) {
for (Entry<?, PartitionItem> entry : unsortedList) {
PartitionItem partitionItem = entry.getValue();
Long id = entry.getKey();
Object id = entry.getKey();
if (!partitionItem.isDefaultPartition()) {
List<Range<MultiColumnBound>> ranges = PartitionItemToRange.toRanges(partitionItem);
for (Range<MultiColumnBound> range : ranges) {
Expand All @@ -118,7 +128,7 @@ private SortedPartitionRanges<?> loadCache(TableIdentifier key, OlapTable olapTa
sortedRanges, defaultPartitions
);
PartitionCacheContext context = new PartitionCacheContext(
olapTable.getId(), olapTable.getVisibleVersion(), sortedPartitionRanges);
table.getId(), table.getPartitionMetaVersion(scan), sortedPartitionRanges);
partitionCaches.put(key, context);
return sortedPartitionRanges;
}
Expand Down Expand Up @@ -166,20 +176,21 @@ private static class TableIdentifier {

private static class PartitionCacheContext {
private final long tableId;
private final long tableVersion;
private final Object partitionMetaVersion;
private final SortedPartitionRanges sortedPartitionRanges;

public PartitionCacheContext(
long tableId, long tableVersion, SortedPartitionRanges sortedPartitionRanges) {
long tableId, Object partitionMetaVersion, SortedPartitionRanges sortedPartitionRanges) {
this.tableId = tableId;
this.tableVersion = tableVersion;
this.partitionMetaVersion
= Objects.requireNonNull(partitionMetaVersion, "partitionMetaVersion cannot be null");
this.sortedPartitionRanges = sortedPartitionRanges;
}

@Override
public String toString() {
return "PartitionCacheContext(tableId="
+ tableId + ", tableVersion=" + tableVersion
+ tableId + ", tableVersion=" + partitionMetaVersion
+ ", partitionNum=" + sortedPartitionRanges.sortedPartitions.size() + ")";
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.doris.nereids.rules.rewrite;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.SupportBinarySearchFilteringPartitions;
import org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.expression.rules.PartitionPruner;
import org.apache.doris.nereids.rules.expression.rules.PartitionPruner.PartitionTableType;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
Expand All @@ -36,6 +40,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -89,8 +94,17 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable,
.collect(Collectors.toList());

Map<String, PartitionItem> nameToPartitionItem = scan.getSelectedPartitions().selectedPartitions;
Optional<SortedPartitionRanges<String>> sortedPartitionRanges = Optional.empty();
if (externalTable instanceof SupportBinarySearchFilteringPartitions) {
NereidsSortedPartitionsCacheManager partitionsCacheManager = Env.getCurrentEnv()
.getSortedPartitionsCacheManager();
sortedPartitionRanges = (Optional) partitionsCacheManager.get(
(SupportBinarySearchFilteringPartitions) externalTable, scan);
}

List<String> prunedPartitions = new ArrayList<>(PartitionPruner.prune(
partitionSlots, filter.getPredicate(), nameToPartitionItem, ctx, PartitionTableType.EXTERNAL));
partitionSlots, filter.getPredicate(), nameToPartitionItem, ctx,
PartitionTableType.EXTERNAL, sortedPartitionRanges));

for (String name : prunedPartitions) {
selectedPartitionItems.put(name, nameToPartitionItem.get(name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public Rule build() {
Map<Long, PartitionItem> idToPartitions;
Optional<SortedPartitionRanges<Long>> sortedPartitionRanges = Optional.empty();
if (manuallySpecifiedPartitions.isEmpty()) {
Optional<SortedPartitionRanges<?>> sortedPartitionRangesOpt = sortedPartitionsCacheManager.get(table);
Optional<SortedPartitionRanges<?>> sortedPartitionRangesOpt
= sortedPartitionsCacheManager.get(table, scan);
if (sortedPartitionRangesOpt.isPresent()) {
sortedPartitionRanges = (Optional) sortedPartitionRangesOpt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
}

ArrayList<String> partitionNames = Lists.newArrayList(relation.getPartNames());
List<Partition> selectedPartitions = getSelectedPartitions(olapTable, filter, partitionNames);
List<Partition> selectedPartitions = getSelectedPartitions(olapTable, filter, scan, partitionNames);

Env.getCurrentEnv()
.getDeleteHandler()
Expand All @@ -240,7 +240,9 @@ private void updateSessionVariableForDelete(SessionVariable sessionVariable) {
}
}

private List<Partition> getSelectedPartitions(OlapTable olapTable, PhysicalFilter<?> filter,
private List<Partition> getSelectedPartitions(
OlapTable olapTable, PhysicalFilter<?> filter,
PhysicalOlapScan scan,
List<String> partitionNames) {
// For un_partitioned table, return all partitions.
if (olapTable.getPartitionInfo().getType().equals(PartitionType.UNPARTITIONED)) {
Expand Down Expand Up @@ -274,7 +276,7 @@ private List<Partition> getSelectedPartitions(OlapTable olapTable, PhysicalFilte
.collect(Collectors.toMap(Function.identity(), idToPartitions::get));
} else {
Optional<SortedPartitionRanges<?>> sortedPartitionRangesOpt
= Env.getCurrentEnv().getSortedPartitionsCacheManager().get(olapTable);
= Env.getCurrentEnv().getSortedPartitionsCacheManager().get(olapTable, scan);
if (sortedPartitionRangesOpt.isPresent()) {
sortedPartitionRanges = (Optional) sortedPartitionRangesOpt;
}
Expand Down

0 comments on commit 3be57eb

Please sign in to comment.