Skip to content

Commit

Permalink
filter push down
Browse files Browse the repository at this point in the history
  • Loading branch information
Tan-JiaLiang committed Nov 29, 2024
1 parent 4515f1c commit d5dc320
Show file tree
Hide file tree
Showing 32 changed files with 789 additions and 84 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex;

import org.apache.paimon.predicate.CompoundPredicate;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateVisitor;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/** Visit the predicate and extract the file index fallback predicate. */
public class FileIndexFilterFallbackPredicateVisitor implements PredicateVisitor<Optional<Predicate>> {

private final Set<FieldRef> fields;

public FileIndexFilterFallbackPredicateVisitor(Set<FieldRef> fields) {
this.fields = fields;
}

@Override
public Optional<Predicate> visit(LeafPredicate predicate) {
if (fields.contains(predicate.fieldRef())) {
return Optional.empty();
}
return Optional.of(predicate);
}

@Override
public Optional<Predicate> visit(CompoundPredicate predicate) {
List<Predicate> converted = new ArrayList<>();
for (Predicate child : predicate.children()) {
child.visit(this).ifPresent(converted::add);
}
if (converted.isEmpty()) {
return Optional.empty();
}
if (converted.size() == 1) {
return Optional.of(converted.get(0));
}
return Optional.of(new CompoundPredicate(predicate.function(), converted));
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fileindex;

import org.apache.paimon.predicate.FieldRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,39 @@
* limitations under the License.
*/

package org.apache.paimon.predicate;
package org.apache.paimon.fileindex;

import org.apache.paimon.fileindex.FileIndexCommon;
import org.apache.paimon.fileindex.FileIndexFilterPushDownAnalyzer;
import org.apache.paimon.fileindex.FileIndexOptions.Column;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.predicate.CompoundPredicate;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateVisitor;

import java.util.Collections;
import java.util.List;
import java.util.Map;

/** Visit the predicate and check if index can push down the predicate. */
public class FileIndexPredicateVisitor implements PredicateVisitor<Boolean> {
public class FileIndexFilterPushDownVisitor implements PredicateVisitor<Boolean> {

private final Map<Column, List<FileIndexFilterPushDownAnalyzer>> analyzers;
private final Map<String, List<FileIndexFilterPushDownAnalyzer>> analyzers;

public FileIndexPredicateVisitor(Map<Column, List<FileIndexFilterPushDownAnalyzer>> analyzers) {
public FileIndexFilterPushDownVisitor() {
this(Collections.emptyMap());
}

public FileIndexFilterPushDownVisitor(
Map<String, List<FileIndexFilterPushDownAnalyzer>> analyzers) {
this.analyzers = analyzers;
}

@Override
public Boolean visit(LeafPredicate predicate) {
for (Map.Entry<Column, List<FileIndexFilterPushDownAnalyzer>> entry : analyzers.entrySet()) {
Column column = entry.getKey();
String fieldName = column.getColumnName();
if (column.isNestedColumn()) {
fieldName = FileIndexCommon.toMapKey(column.getColumnName(), column.getNestedColumnName());
}

if (!fieldName.equals(predicate.fieldName())) {
continue;
}

List<FileIndexFilterPushDownAnalyzer> analyzers = entry.getValue();
for (FileIndexFilterPushDownAnalyzer analyzer : analyzers) {
if (analyzer.visit(predicate)) {
return true;
}
List<FileIndexFilterPushDownAnalyzer> analyzers =
this.analyzers.getOrDefault(predicate.fieldName(), Collections.emptyList());
for (FileIndexFilterPushDownAnalyzer analyzer : analyzers) {
if (analyzer.visit(predicate)) {
return true;
}
}
return false;
Expand All @@ -68,4 +65,9 @@ public Boolean visit(CompoundPredicate predicate) {
}
return true;
}

@VisibleForTesting
public Map<String, List<FileIndexFilterPushDownAnalyzer>> getAnalyzers() {
return analyzers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.StringUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -172,21 +173,43 @@ public Options getMapTopLevelOptions(String column, String indexType) {
+ indexType));
}

public Map<Column, List<FileIndexFilterPushDownAnalyzer>> getFileIndexFilterPushDownAnalyzer(RowType rowType) {
Map<Column, List<FileIndexFilterPushDownAnalyzer>> results = new HashMap<>();
public FileIndexFilterPushDownVisitor createFilterPushDownPredicateVisitor(RowType rowType) {
Map<String, List<FileIndexFilterPushDownAnalyzer>> analyzers = new HashMap<>();
for (Map.Entry<Column, Map<String, Options>> entry : indexTypeOptions.entrySet()) {
Column key = entry.getKey();
Map<String, Options> options = entry.getValue();
for (Map.Entry<String, Options> optionsEntry : options.entrySet()) {
DataField field = rowType.getField(key.columnName);
FileIndexFilterPushDownAnalyzer analyzer =
FileIndexer.create(
optionsEntry.getKey(), field.type(), optionsEntry.getValue())
.createFilterPushDownAnalyzer();
results.computeIfAbsent(key, k -> new ArrayList<>()).add(analyzer);
Column column = entry.getKey();
for (Map.Entry<String, Options> typeEntry : entry.getValue().entrySet()) {
String key;
FileIndexFilterPushDownAnalyzer analyzer;
DataField field = rowType.getField(column.columnName);
Options options = typeEntry.getValue();
if (column.isNestedColumn) {
if (field.type().getTypeRoot() != DataTypeRoot.MAP) {
throw new IllegalArgumentException(
"Column "
+ column.columnName
+ " is nested column, but is not map type. Only should map type yet.");
}
MapType mapType = (MapType) field.type();
Options mapTopLevelOptions =
getMapTopLevelOptions(column.columnName, typeEntry.getKey());
key = FileIndexCommon.toMapKey(column.columnName, column.nestedColumnName);
analyzer =
FileIndexer.create(
typeEntry.getKey(),
mapType.getValueType(),
new Options(
mapTopLevelOptions.toMap(), options.toMap()))
.createFilterPushDownAnalyzer();
} else {
key = column.columnName;
analyzer =
FileIndexer.create(typeEntry.getKey(), field.type(), options)
.createFilterPushDownAnalyzer();
}
analyzers.computeIfAbsent(key, k -> new ArrayList<>()).add(analyzer);
}
}
return results;
return new FileIndexFilterPushDownVisitor(analyzers);
}

public boolean isEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.paimon.fileindex;

import org.apache.paimon.predicate.FieldRef;

import java.util.Collections;
import java.util.Set;

/** File index result to decide whether filter a file. */
public interface FileIndexResult {

Expand Down Expand Up @@ -59,6 +64,10 @@ public FileIndexResult or(FileIndexResult fileIndexResult) {

boolean remain();

default Set<FieldRef> applyIndexes() {
return Collections.emptySet();
}

default FileIndexResult and(FileIndexResult fileIndexResult) {
if (fileIndexResult.remain()) {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public FileIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
@Override
public FileIndexResult visitIn(FieldRef fieldRef, List<Object> literals) {
return new BitmapIndexResult(
Collections.singleton(fieldRef),
() -> {
readInternalMeta(fieldRef.type());
return getInListResultBitmap(literals);
Expand All @@ -204,6 +205,7 @@ public FileIndexResult visitIn(FieldRef fieldRef, List<Object> literals) {
@Override
public FileIndexResult visitNotIn(FieldRef fieldRef, List<Object> literals) {
return new BitmapIndexResult(
Collections.singleton(fieldRef),
() -> {
readInternalMeta(fieldRef.type());
RoaringBitmap32 bitmap = getInListResultBitmap(literals);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,40 @@
package org.apache.paimon.fileindex.bitmap;

import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.utils.LazyField;
import org.apache.paimon.utils.RoaringBitmap32;

import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;

/** bitmap file index result. */
public class BitmapIndexResult extends LazyField<RoaringBitmap32> implements FileIndexResult {

public BitmapIndexResult(Supplier<RoaringBitmap32> supplier) {
private final Set<FieldRef> fields;

public BitmapIndexResult(Set<FieldRef> fields, Supplier<RoaringBitmap32> supplier) {
super(supplier);
this.fields = new HashSet<>(fields);
}

@Override
public boolean remain() {
return !get().isEmpty();
}

@Override
public Set<FieldRef> applyIndexes() {
return fields;
}

@Override
public FileIndexResult and(FileIndexResult fileIndexResult) {
if (fileIndexResult instanceof BitmapIndexResult) {
fields.addAll(fileIndexResult.applyIndexes());
return new BitmapIndexResult(
fields,
() -> RoaringBitmap32.and(get(), ((BitmapIndexResult) fileIndexResult).get()));
}
return FileIndexResult.super.and(fileIndexResult);
Expand All @@ -48,7 +61,9 @@ public FileIndexResult and(FileIndexResult fileIndexResult) {
@Override
public FileIndexResult or(FileIndexResult fileIndexResult) {
if (fileIndexResult instanceof BitmapIndexResult) {
fields.addAll(fileIndexResult.applyIndexes());
return new BitmapIndexResult(
fields,
() -> RoaringBitmap32.or(get(), ((BitmapIndexResult) fileIndexResult).get()));
}
return FileIndexResult.super.and(fileIndexResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public Reader(
@Override
public FileIndexResult visitIsNull(FieldRef fieldRef) {
return new BitmapIndexResult(
Collections.singleton(fieldRef),
() -> {
RoaringBitmap32 bitmap =
RoaringBitmap32.or(positive.isNotNull(), negative.isNotNull());
Expand All @@ -226,6 +227,7 @@ public FileIndexResult visitIsNull(FieldRef fieldRef) {
@Override
public FileIndexResult visitIsNotNull(FieldRef fieldRef) {
return new BitmapIndexResult(
Collections.singleton(fieldRef),
() -> RoaringBitmap32.or(positive.isNotNull(), negative.isNotNull()));
}

Expand All @@ -242,6 +244,7 @@ public FileIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
@Override
public FileIndexResult visitIn(FieldRef fieldRef, List<Object> literals) {
return new BitmapIndexResult(
Collections.singleton(fieldRef),
() ->
literals.stream()
.map(valueMapper)
Expand All @@ -261,6 +264,7 @@ public FileIndexResult visitIn(FieldRef fieldRef, List<Object> literals) {
@Override
public FileIndexResult visitNotIn(FieldRef fieldRef, List<Object> literals) {
return new BitmapIndexResult(
Collections.singleton(fieldRef),
() -> {
RoaringBitmap32 ebm =
RoaringBitmap32.or(positive.isNotNull(), negative.isNotNull());
Expand All @@ -285,6 +289,7 @@ public FileIndexResult visitNotIn(FieldRef fieldRef, List<Object> literals) {
@Override
public FileIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
return new BitmapIndexResult(
Collections.singleton(fieldRef),
() -> {
Long value = valueMapper.apply(literal);
if (value < 0) {
Expand All @@ -298,6 +303,7 @@ public FileIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
@Override
public FileIndexResult visitLessOrEqual(FieldRef fieldRef, Object literal) {
return new BitmapIndexResult(
Collections.singleton(fieldRef),
() -> {
Long value = valueMapper.apply(literal);
if (value < 0) {
Expand All @@ -311,6 +317,7 @@ public FileIndexResult visitLessOrEqual(FieldRef fieldRef, Object literal) {
@Override
public FileIndexResult visitGreaterThan(FieldRef fieldRef, Object literal) {
return new BitmapIndexResult(
Collections.singleton(fieldRef),
() -> {
Long value = valueMapper.apply(literal);
if (value < 0) {
Expand All @@ -325,6 +332,7 @@ public FileIndexResult visitGreaterThan(FieldRef fieldRef, Object literal) {
@Override
public FileIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
return new BitmapIndexResult(
Collections.singleton(fieldRef),
() -> {
Long value = valueMapper.apply(literal);
if (value < 0) {
Expand Down
Loading

0 comments on commit d5dc320

Please sign in to comment.