From d5dc32040c55231dfc4cbfeb179f1375357c196e Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Fri, 29 Nov 2024 14:14:55 +0800 Subject: [PATCH] filter push down --- ...leIndexFilterFallbackPredicateVisitor.java | 63 +++++++++ .../FileIndexFilterPushDownAnalyzer.java | 18 +++ .../FileIndexFilterPushDownVisitor.java} | 48 +++---- .../paimon/fileindex/FileIndexOptions.java | 49 +++++-- .../paimon/fileindex/FileIndexResult.java | 9 ++ .../fileindex/bitmap/BitmapFileIndex.java | 2 + .../fileindex/bitmap/BitmapIndexResult.java | 17 ++- .../bsi/BitSliceIndexBitmapFileIndex.java | 8 ++ .../paimon/reader/FileRecordReader.java | 23 ++++ ...dexFilterFallbackPredicateVisitorTest.java | 111 ++++++++++++++++ .../FileIndexFilterPushDownVisitorTest.java | 123 ++++++++++++++++++ .../paimon/operation/MergeFileSplitRead.java | 48 +++++-- .../paimon/operation/RawFileSplitRead.java | 25 +++- .../apache/paimon/operation/SplitRead.java | 8 ++ .../paimon/table/AbstractFileStoreTable.java | 8 ++ .../table/AppendOnlyFileStoreTable.java | 6 + .../java/org/apache/paimon/table/Table.java | 7 + .../paimon/table/source/InnerTableRead.java | 7 + .../table/source/KeyValueTableRead.java | 11 ++ .../paimon/table/source/ReadBuilder.java | 3 + .../paimon/table/source/ReadBuilderImpl.java | 11 ++ .../splitread/IncrementalDiffSplitRead.java | 6 + .../table/AppendOnlyFileStoreTableTest.java | 90 +++++++++++++ .../table/PrimaryKeyFileStoreTableTest.java | 70 ++++++++++ .../paimon/flink/source/DataTableSource.java | 4 + .../flink/source/BaseDataTableSource.java | 8 ++ .../paimon/flink/source/DataTableSource.java | 4 + .../flink/source/FlinkSourceBuilder.java | 9 ++ .../paimon/flink/source/FlinkTableSource.java | 32 ++--- .../flink/source/FilterPushDownITCase.java | 41 +++--- .../FileStoreTableStatisticsTestBase.java | 3 + .../PrimaryKeyTableStatisticsTest.java | 1 + 32 files changed, 789 insertions(+), 84 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitor.java rename paimon-common/src/main/java/org/apache/paimon/{predicate/FileIndexPredicateVisitor.java => fileindex/FileIndexFilterPushDownVisitor.java} (54%) create mode 100644 paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitorTest.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitorTest.java diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitor.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitor.java new file mode 100644 index 0000000000000..4a77858204d77 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateVisitor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** Visit the predicate and extract the file index fallback predicate. */ +public class FileIndexFilterFallbackPredicateVisitor implements PredicateVisitor> { + + private final Set fields; + + public FileIndexFilterFallbackPredicateVisitor(Set fields) { + this.fields = fields; + } + + @Override + public Optional visit(LeafPredicate predicate) { + if (fields.contains(predicate.fieldRef())) { + return Optional.empty(); + } + return Optional.of(predicate); + } + + @Override + public Optional visit(CompoundPredicate predicate) { + List converted = new ArrayList<>(); + for (Predicate child : predicate.children()) { + child.visit(this).ifPresent(converted::add); + } + if (converted.isEmpty()) { + return Optional.empty(); + } + if (converted.size() == 1) { + return Optional.of(converted.get(0)); + } + return Optional.of(new CompoundPredicate(predicate.function(), converted)); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownAnalyzer.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownAnalyzer.java index 13d5660785927..7c21430639f15 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownAnalyzer.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownAnalyzer.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.paimon.fileindex; import org.apache.paimon.predicate.FieldRef; diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/FileIndexPredicateVisitor.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitor.java similarity index 54% rename from paimon-common/src/main/java/org/apache/paimon/predicate/FileIndexPredicateVisitor.java rename to paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitor.java index eb7661eba3f55..062144a3fd973 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/FileIndexPredicateVisitor.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitor.java @@ -16,42 +16,39 @@ * limitations under the License. */ -package org.apache.paimon.predicate; +package org.apache.paimon.fileindex; -import org.apache.paimon.fileindex.FileIndexCommon; -import org.apache.paimon.fileindex.FileIndexFilterPushDownAnalyzer; -import org.apache.paimon.fileindex.FileIndexOptions.Column; +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateVisitor; +import java.util.Collections; import java.util.List; import java.util.Map; /** Visit the predicate and check if index can push down the predicate. */ -public class FileIndexPredicateVisitor implements PredicateVisitor { +public class FileIndexFilterPushDownVisitor implements PredicateVisitor { - private final Map> analyzers; + private final Map> analyzers; - public FileIndexPredicateVisitor(Map> analyzers) { + public FileIndexFilterPushDownVisitor() { + this(Collections.emptyMap()); + } + + public FileIndexFilterPushDownVisitor( + Map> analyzers) { this.analyzers = analyzers; } @Override public Boolean visit(LeafPredicate predicate) { - for (Map.Entry> entry : analyzers.entrySet()) { - Column column = entry.getKey(); - String fieldName = column.getColumnName(); - if (column.isNestedColumn()) { - fieldName = FileIndexCommon.toMapKey(column.getColumnName(), column.getNestedColumnName()); - } - - if (!fieldName.equals(predicate.fieldName())) { - continue; - } - - List analyzers = entry.getValue(); - for (FileIndexFilterPushDownAnalyzer analyzer : analyzers) { - if (analyzer.visit(predicate)) { - return true; - } + List analyzers = + this.analyzers.getOrDefault(predicate.fieldName(), Collections.emptyList()); + for (FileIndexFilterPushDownAnalyzer analyzer : analyzers) { + if (analyzer.visit(predicate)) { + return true; } } return false; @@ -68,4 +65,9 @@ public Boolean visit(CompoundPredicate predicate) { } return true; } + + @VisibleForTesting + public Map> getAnalyzers() { + return analyzers; + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java index de28accd515d7..ae63fd9538036 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java @@ -21,13 +21,14 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.StringUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -172,21 +173,43 @@ public Options getMapTopLevelOptions(String column, String indexType) { + indexType)); } - public Map> getFileIndexFilterPushDownAnalyzer(RowType rowType) { - Map> results = new HashMap<>(); + public FileIndexFilterPushDownVisitor createFilterPushDownPredicateVisitor(RowType rowType) { + Map> analyzers = new HashMap<>(); for (Map.Entry> entry : indexTypeOptions.entrySet()) { - Column key = entry.getKey(); - Map options = entry.getValue(); - for (Map.Entry optionsEntry : options.entrySet()) { - DataField field = rowType.getField(key.columnName); - FileIndexFilterPushDownAnalyzer analyzer = - FileIndexer.create( - optionsEntry.getKey(), field.type(), optionsEntry.getValue()) - .createFilterPushDownAnalyzer(); - results.computeIfAbsent(key, k -> new ArrayList<>()).add(analyzer); + Column column = entry.getKey(); + for (Map.Entry typeEntry : entry.getValue().entrySet()) { + String key; + FileIndexFilterPushDownAnalyzer analyzer; + DataField field = rowType.getField(column.columnName); + Options options = typeEntry.getValue(); + if (column.isNestedColumn) { + if (field.type().getTypeRoot() != DataTypeRoot.MAP) { + throw new IllegalArgumentException( + "Column " + + column.columnName + + " is nested column, but is not map type. Only should map type yet."); + } + MapType mapType = (MapType) field.type(); + Options mapTopLevelOptions = + getMapTopLevelOptions(column.columnName, typeEntry.getKey()); + key = FileIndexCommon.toMapKey(column.columnName, column.nestedColumnName); + analyzer = + FileIndexer.create( + typeEntry.getKey(), + mapType.getValueType(), + new Options( + mapTopLevelOptions.toMap(), options.toMap())) + .createFilterPushDownAnalyzer(); + } else { + key = column.columnName; + analyzer = + FileIndexer.create(typeEntry.getKey(), field.type(), options) + .createFilterPushDownAnalyzer(); + } + analyzers.computeIfAbsent(key, k -> new ArrayList<>()).add(analyzer); } } - return results; + return new FileIndexFilterPushDownVisitor(analyzers); } public boolean isEmpty() { diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexResult.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexResult.java index 3661a9b59a39b..15af1ab1993ce 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexResult.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexResult.java @@ -18,6 +18,11 @@ package org.apache.paimon.fileindex; +import org.apache.paimon.predicate.FieldRef; + +import java.util.Collections; +import java.util.Set; + /** File index result to decide whether filter a file. */ public interface FileIndexResult { @@ -59,6 +64,10 @@ public FileIndexResult or(FileIndexResult fileIndexResult) { boolean remain(); + default Set applyIndexes() { + return Collections.emptySet(); + } + default FileIndexResult and(FileIndexResult fileIndexResult) { if (fileIndexResult.remain()) { return this; diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java index f89874b387ca7..641aadd02c4b9 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java @@ -195,6 +195,7 @@ public FileIndexResult visitNotEqual(FieldRef fieldRef, Object literal) { @Override public FileIndexResult visitIn(FieldRef fieldRef, List literals) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { readInternalMeta(fieldRef.type()); return getInListResultBitmap(literals); @@ -204,6 +205,7 @@ public FileIndexResult visitIn(FieldRef fieldRef, List literals) { @Override public FileIndexResult visitNotIn(FieldRef fieldRef, List literals) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { readInternalMeta(fieldRef.type()); RoaringBitmap32 bitmap = getInListResultBitmap(literals); diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java index 8d572ff254fc3..6289962d24396 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java @@ -19,16 +19,22 @@ package org.apache.paimon.fileindex.bitmap; import org.apache.paimon.fileindex.FileIndexResult; +import org.apache.paimon.predicate.FieldRef; import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; +import java.util.HashSet; +import java.util.Set; import java.util.function.Supplier; /** bitmap file index result. */ public class BitmapIndexResult extends LazyField implements FileIndexResult { - public BitmapIndexResult(Supplier supplier) { + private final Set fields; + + public BitmapIndexResult(Set fields, Supplier supplier) { super(supplier); + this.fields = new HashSet<>(fields); } @Override @@ -36,10 +42,17 @@ public boolean remain() { return !get().isEmpty(); } + @Override + public Set applyIndexes() { + return fields; + } + @Override public FileIndexResult and(FileIndexResult fileIndexResult) { if (fileIndexResult instanceof BitmapIndexResult) { + fields.addAll(fileIndexResult.applyIndexes()); return new BitmapIndexResult( + fields, () -> RoaringBitmap32.and(get(), ((BitmapIndexResult) fileIndexResult).get())); } return FileIndexResult.super.and(fileIndexResult); @@ -48,7 +61,9 @@ public FileIndexResult and(FileIndexResult fileIndexResult) { @Override public FileIndexResult or(FileIndexResult fileIndexResult) { if (fileIndexResult instanceof BitmapIndexResult) { + fields.addAll(fileIndexResult.applyIndexes()); return new BitmapIndexResult( + fields, () -> RoaringBitmap32.or(get(), ((BitmapIndexResult) fileIndexResult).get())); } return FileIndexResult.super.and(fileIndexResult); diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndex.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndex.java index da821b701ab77..243ba22c0d7e4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndex.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndex.java @@ -215,6 +215,7 @@ public Reader( @Override public FileIndexResult visitIsNull(FieldRef fieldRef) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { RoaringBitmap32 bitmap = RoaringBitmap32.or(positive.isNotNull(), negative.isNotNull()); @@ -226,6 +227,7 @@ public FileIndexResult visitIsNull(FieldRef fieldRef) { @Override public FileIndexResult visitIsNotNull(FieldRef fieldRef) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> RoaringBitmap32.or(positive.isNotNull(), negative.isNotNull())); } @@ -242,6 +244,7 @@ public FileIndexResult visitNotEqual(FieldRef fieldRef, Object literal) { @Override public FileIndexResult visitIn(FieldRef fieldRef, List literals) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> literals.stream() .map(valueMapper) @@ -261,6 +264,7 @@ public FileIndexResult visitIn(FieldRef fieldRef, List literals) { @Override public FileIndexResult visitNotIn(FieldRef fieldRef, List literals) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { RoaringBitmap32 ebm = RoaringBitmap32.or(positive.isNotNull(), negative.isNotNull()); @@ -285,6 +289,7 @@ public FileIndexResult visitNotIn(FieldRef fieldRef, List literals) { @Override public FileIndexResult visitLessThan(FieldRef fieldRef, Object literal) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { Long value = valueMapper.apply(literal); if (value < 0) { @@ -298,6 +303,7 @@ public FileIndexResult visitLessThan(FieldRef fieldRef, Object literal) { @Override public FileIndexResult visitLessOrEqual(FieldRef fieldRef, Object literal) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { Long value = valueMapper.apply(literal); if (value < 0) { @@ -311,6 +317,7 @@ public FileIndexResult visitLessOrEqual(FieldRef fieldRef, Object literal) { @Override public FileIndexResult visitGreaterThan(FieldRef fieldRef, Object literal) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { Long value = valueMapper.apply(literal); if (value < 0) { @@ -325,6 +332,7 @@ public FileIndexResult visitGreaterThan(FieldRef fieldRef, Object literal) { @Override public FileIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object literal) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { Long value = valueMapper.apply(literal); if (value < 0) { diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java index 4d5356edf2757..189930816f1f0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java @@ -18,6 +18,8 @@ package org.apache.paimon.reader; +import org.apache.paimon.utils.Filter; + import javax.annotation.Nullable; import java.io.IOException; @@ -28,4 +30,25 @@ public interface FileRecordReader extends RecordReader { @Override @Nullable FileRecordIterator readBatch() throws IOException; + + @Override + default FileRecordReader filter(Filter filter) { + FileRecordReader thisReader = this; + return new FileRecordReader() { + @Nullable + @Override + public FileRecordIterator readBatch() throws IOException { + FileRecordIterator iterator = thisReader.readBatch(); + if (iterator == null) { + return null; + } + return iterator.filter(filter); + } + + @Override + public void close() throws IOException { + thisReader.close(); + } + }; + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitorTest.java b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitorTest.java new file mode 100644 index 0000000000000..92c2c0c20ff2e --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitorTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link FileIndexFilterFallbackPredicateVisitor}. */ +public class FileIndexFilterFallbackPredicateVisitorTest { + + @Test + public void testVisitLeafPredicate() { + RowType rowType = + new RowType( + Arrays.asList( + new DataField(0, "a", DataTypes.STRING()), + new DataField(1, "b", DataTypes.STRING()))); + Set indexFields = Collections.singleton(new FieldRef(0, "a", DataTypes.STRING())); + FileIndexFilterFallbackPredicateVisitor visitor = + new FileIndexFilterFallbackPredicateVisitor(indexFields); + + Predicate p1 = new PredicateBuilder(rowType).equal(0, "a"); + Optional r1 = visitor.visit((LeafPredicate) p1); + assertThat(r1.isPresent()).isFalse(); + + Predicate p2 = new PredicateBuilder(rowType).equal(1, "b"); + Optional r2 = visitor.visit((LeafPredicate) p2); + assertThat(r2).isPresent(); + assertThat(r2.get()).isEqualTo(p2); + } + + @Test + public void testVisitCompoundPredicate() { + RowType rowType = + new RowType( + Arrays.asList( + new DataField(0, "a", DataTypes.INT()), + new DataField(1, "b", DataTypes.STRING()), + new DataField(2, "c", DataTypes.INT()))); + + Predicate predicate = + PredicateBuilder.and( + new PredicateBuilder(rowType).greaterThan(0, 1), + new PredicateBuilder(rowType).equal(1, "b"), + new PredicateBuilder(rowType).lessThan(2, 2)); + + Set indexFields = + new HashSet<>( + Arrays.asList( + new FieldRef(0, "a", DataTypes.INT()), + new FieldRef(2, "c", DataTypes.INT()))); + FileIndexFilterFallbackPredicateVisitor v1 = new FileIndexFilterFallbackPredicateVisitor(indexFields); + Optional r1 = v1.visit((CompoundPredicate) predicate); + assertThat(r1).isPresent(); + assertThat(r1.get()).isEqualTo(new PredicateBuilder(rowType).equal(1, "b")); + + FileIndexFilterFallbackPredicateVisitor v2 = + new FileIndexFilterFallbackPredicateVisitor( + new HashSet<>( + Collections.singletonList(new FieldRef(0, "a", DataTypes.INT())))); + Optional r2 = v2.visit((CompoundPredicate) predicate); + assertThat(r2).isPresent(); + assertThat(r2.get()) + .isEqualTo( + PredicateBuilder.and( + new PredicateBuilder(rowType).equal(1, "b"), + new PredicateBuilder(rowType).lessThan(2, 2))); + + FileIndexFilterFallbackPredicateVisitor v3 = + new FileIndexFilterFallbackPredicateVisitor( + new HashSet<>( + Arrays.asList( + new FieldRef(0, "a", DataTypes.INT()), + new FieldRef(1, "b", DataTypes.STRING()), + new FieldRef(2, "c", DataTypes.INT())))); + Optional r3 = v3.visit((CompoundPredicate) predicate); + assertThat(r3.isPresent()).isFalse(); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitorTest.java b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitorTest.java new file mode 100644 index 0000000000000..b6235641bc918 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitorTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndexFactory; +import org.apache.paimon.fileindex.bsi.BitSliceIndexBitmapFileIndexFactory; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link FileIndexOptions}. */ +public class FileIndexFilterPushDownVisitorTest { + + private RowType rowType; + private FileIndexFilterPushDownVisitor visitor; + + @BeforeEach + public void setup() { + this.rowType = + RowType.builder() + .field("a", DataTypes.INT()) + .field("b", DataTypes.INT()) + .field("c", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) + .build(); + Map properties = new HashMap<>(); + properties.put( + FileIndexOptions.FILE_INDEX + + "." + + BloomFilterFileIndexFactory.BLOOM_FILTER + + "." + + CoreOptions.COLUMNS, + "a,c[hello]"); + properties.put( + FileIndexOptions.FILE_INDEX + + "." + + BloomFilterFileIndexFactory.BLOOM_FILTER + + "." + + "c[hello].fpp", + "200"); + properties.put( + FileIndexOptions.FILE_INDEX + + "." + + BitSliceIndexBitmapFileIndexFactory.BSI_INDEX + + "." + + CoreOptions.COLUMNS, + "a,b"); + FileIndexOptions fileIndexOptions = CoreOptions.fromMap(properties).indexColumnsOptions(); + this.visitor = fileIndexOptions.createFilterPushDownPredicateVisitor(rowType); + } + + @Test + public void testCreateFilterPushDownPredicateVisitor() { + Map> analyzers = visitor.getAnalyzers(); + assertThat(analyzers.size()).isEqualTo(3); + assertThat(analyzers.get("a").size()).isEqualTo(2); + assertThat(analyzers.get("b").size()).isEqualTo(1); + assertThat(analyzers.get("c[hello]").size()).isEqualTo(1); + } + + @Test + public void testVisitLeafPredicate() { + // test column a with bloom-filter and bsi index + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).equal(0, 1))) + .isTrue(); + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).lessThan(0, 1))) + .isTrue(); + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).greaterThan(0, 1))) + .isTrue(); + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).startsWith(0, 1))) + .isFalse(); + + // test column b with bsi index + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).equal(1, 1))) + .isTrue(); + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).lessThan(1, 1))) + .isTrue(); + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).greaterThan(1, 1))) + .isTrue(); + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).startsWith(0, 1))) + .isFalse(); + + // test map column c[hello] with bloom-filter index + assertThat( + visitor.visit( + new LeafPredicate( + Equal.INSTANCE, + DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), + 2, + "c[hello]", + Collections.singletonList(BinaryString.fromString("a"))))) + .isFalse(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java index 23a3a576e4a6b..11d240b5dacc3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java @@ -26,6 +26,7 @@ import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.UnsupportedSchemeException; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.mergetree.DropDeleteReader; @@ -86,6 +87,8 @@ public class MergeFileSplitRead implements SplitRead { @Nullable private List filtersForAll; + @Nullable private Predicate indexFilter; + @Nullable private int[][] pushdownProjection; @Nullable private int[][] outerProjection; @@ -239,27 +242,48 @@ public MergeFileSplitRead withFilter(Predicate predicate) { return this; } + @Override + public MergeFileSplitRead withIndexFilter(Predicate indexFilter) { + this.indexFilter = indexFilter; + return this; + } + @Override public RecordReader createReader(DataSplit split) throws IOException { if (!split.beforeFiles().isEmpty()) { throw new IllegalArgumentException("This read cannot accept split with before files."); } + RecordReader reader; if (split.isStreaming()) { - return createNoMergeReader( - split.partition(), - split.bucket(), - split.dataFiles(), - split.deletionFiles().orElse(null), - split.isStreaming()); + reader = + createNoMergeReader( + split.partition(), + split.bucket(), + split.dataFiles(), + split.deletionFiles().orElse(null), + split.isStreaming()); } else { - return createMergeReader( - split.partition(), - split.bucket(), - split.dataFiles(), - split.deletionFiles().orElse(null), - forceKeepDelete); + reader = + createMergeReader( + split.partition(), + split.bucket(), + split.dataFiles(), + split.deletionFiles().orElse(null), + forceKeepDelete); } + + // if the index has been filter push down, the filter should be fallback to the reader + if (indexFilter != null) { + if (split.isStreaming()) { + throw new UnsupportedSchemeException( + "index should not be pushed down in streaming mode."); + } + + reader = reader.filter(row -> indexFilter.test(row.value())); + } + + return reader; } public RecordReader createMergeReader( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 46977457c4be5..eddfbfd6d9f24 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -23,6 +23,7 @@ import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fileindex.FileIndexFilterFallbackPredicateVisitor; import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.fileindex.bitmap.ApplyBitmapIndexRecordReader; import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; @@ -36,6 +37,7 @@ import org.apache.paimon.io.FileIndexEvaluator; import org.apache.paimon.mergetree.compact.ConcatRecordReader; import org.apache.paimon.partition.PartitionUtils; +import org.apache.paimon.predicate.FieldRef; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.EmptyFileRecordReader; import org.apache.paimon.reader.FileRecordReader; @@ -58,9 +60,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; @@ -80,6 +85,7 @@ public class RawFileSplitRead implements SplitRead { private RowType readRowType; @Nullable private List filters; + @Nullable private Predicate indexFilter; public RawFileSplitRead( FileIO fileIO, @@ -123,6 +129,12 @@ public RawFileSplitRead withFilter(Predicate predicate) { return this; } + @Override + public SplitRead withIndexFilter(@Nullable Predicate indexFilter) { + this.indexFilter = indexFilter; + return this; + } + @Override public RecordReader createReader(DataSplit split) throws IOException { if (split.beforeFiles().size() > 0) { @@ -229,10 +241,21 @@ private FileRecordReader createFileReader( fileRecordReader, (BitmapIndexResult) fileIndexResult); } + Set fields = + fileIndexResult == null ? Collections.emptySet() : fileIndexResult.applyIndexes(); + if (indexFilter != null) { + Optional fallbackPredicate = + indexFilter.visit(new FileIndexFilterFallbackPredicateVisitor(fields)); + if (fallbackPredicate.isPresent()) { + fileRecordReader = fileRecordReader.filter(fallbackPredicate.get()::test); + } + } + DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get(); if (deletionVector != null && !deletionVector.isEmpty()) { - return new ApplyDeletionVectorReader(fileRecordReader, deletionVector); + fileRecordReader = new ApplyDeletionVectorReader(fileRecordReader, deletionVector); } + return fileRecordReader; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java index 1722aa53ed9f1..5e1c5fee07aa0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java @@ -44,6 +44,8 @@ public interface SplitRead { SplitRead withFilter(@Nullable Predicate predicate); + SplitRead withIndexFilter(@Nullable Predicate predicate); + /** Create a {@link RecordReader} from split. */ RecordReader createReader(DataSplit split) throws IOException; @@ -74,6 +76,12 @@ public SplitRead withFilter(@Nullable Predicate predicate) { return this; } + @Override + public SplitRead withIndexFilter(@Nullable Predicate predicate) { + read.withIndexFilter(predicate); + return this; + } + @Override public RecordReader createReader(DataSplit split) throws IOException { return convertedFactory.apply(split); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 07c0e88645ac3..f663cb3a88c09 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -22,6 +22,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.consumer.ConsumerManager; +import org.apache.paimon.fileindex.FileIndexFilterPushDownVisitor; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.IndexManifestEntry; @@ -149,6 +150,13 @@ public SimpleFileReader indexManifestFileReader() { return store().indexManifestFileFactory().create(); } + @Override + public FileIndexFilterPushDownVisitor fileIndexFilterPushDownVisitor() { + return coreOptions() + .indexColumnsOptions() + .createFilterPushDownPredicateVisitor(schema().logicalRowType()); + } + @Override public String name() { return identifier().getObjectName(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 103fa64050aa2..e304ac895e991 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -120,6 +120,12 @@ protected InnerTableRead innerWithFilter(Predicate predicate) { return this; } + @Override + public InnerTableRead withIndexFilter(Predicate predicate) { + read.withIndexFilter(predicate); + return this; + } + @Override public void applyReadType(RowType readType) { read.withReadType(readType); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 7ed7ba48a8ebd..d5e9d9d628f82 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -21,6 +21,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.Experimental; import org.apache.paimon.annotation.Public; +import org.apache.paimon.fileindex.FileIndexFilterPushDownVisitor; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; @@ -109,6 +110,12 @@ default String uuid() { @Experimental SimpleFileReader indexManifestFileReader(); + /** Visitor to check if index can push down the predicate. */ + @Experimental + default FileIndexFilterPushDownVisitor fileIndexFilterPushDownVisitor() { + return new FileIndexFilterPushDownVisitor(); + } + /** Rollback table's state to a specific snapshot. */ @Experimental void rollbackTo(long snapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java index a8f1890674668..ba6defa8856e8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java @@ -36,6 +36,13 @@ default InnerTableRead withFilter(List predicates) { InnerTableRead withFilter(Predicate predicate); + default InnerTableRead withIndexFilter(Predicate predicate) { + if (predicate == null) { + return this; + } + throw new UnsupportedOperationException(); + } + /** Use {@link #withReadType(RowType)} instead. */ @Deprecated default InnerTableRead withProjection(int[] projection) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java index c62f2118df6cf..db00aed513ea5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java @@ -52,6 +52,7 @@ public final class KeyValueTableRead extends AbstractDataTableRead { @Nullable private RowType readType = null; private boolean forceKeepDelete = false; private Predicate predicate = null; + private Predicate indexPredicate = null; private IOManager ioManager = null; public KeyValueTableRead( @@ -84,6 +85,9 @@ private void assignValues(SplitRead read) { if (readType != null) { read = read.withReadType(readType); } + if (indexPredicate != null) { + read = read.withFilter(indexPredicate); + } read.withFilter(predicate).withIOManager(ioManager); } @@ -107,6 +111,13 @@ protected InnerTableRead innerWithFilter(Predicate predicate) { return this; } + @Override + public InnerTableRead withIndexFilter(Predicate indexPredicate) { + initialized().forEach(r -> r.withIndexFilter(indexPredicate)); + this.indexPredicate = indexPredicate; + return this; + } + @Override public TableRead withIOManager(IOManager ioManager) { initialized().forEach(r -> r.withIOManager(ioManager)); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java index 0c1386ce441d8..115257a45a058 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java @@ -98,6 +98,9 @@ default ReadBuilder withFilter(List predicates) { */ ReadBuilder withFilter(Predicate predicate); + /** Push index filters, when index is empty, fallback to use it to do filter instead. */ + ReadBuilder withIndexFilter(Predicate predicate); + /** Push partition filter. */ ReadBuilder withPartitionFilter(Map partitionSpec); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java index 95bfe6f24bc77..18322067b54d7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java @@ -40,6 +40,8 @@ public class ReadBuilderImpl implements ReadBuilder { private Predicate filter; + private Predicate indexFilter; + private Integer limit = null; private Integer shardIndexOfThisSubtask; @@ -81,6 +83,12 @@ public ReadBuilder withFilter(Predicate filter) { return this; } + @Override + public ReadBuilder withIndexFilter(Predicate indexFilter) { + this.indexFilter = indexFilter; + return this; + } + @Override public ReadBuilder withPartitionFilter(Map partitionSpec) { this.partitionSpec = partitionSpec; @@ -176,6 +184,9 @@ public TableRead newRead() { if (readType != null) { read.withReadType(readType); } + if (indexFilter != null) { + read.withIndexFilter(indexFilter); + } return read; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java index 2e236b1dffc06..acf4be5c44896 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java @@ -83,6 +83,12 @@ public SplitRead withFilter(@Nullable Predicate predicate) { return this; } + @Override + public SplitRead withIndexFilter(@Nullable Predicate predicate) { + mergeRead.withIndexFilter(predicate); + return this; + } + @Override public RecordReader createReader(DataSplit split) throws IOException { RecordReader reader = diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index 0328cc6bada34..39e1e4c19029f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -80,6 +80,7 @@ import static org.apache.paimon.CoreOptions.BUCKET_KEY; import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD; import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE; +import static org.apache.paimon.CoreOptions.WRITE_ONLY; import static org.apache.paimon.io.DataFileTestUtils.row; import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket; import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode; @@ -702,6 +703,95 @@ public void testBSIAndBitmapIndexInDisk() throws Exception { }); } + @Test + public void testFileIndexFilterPushDownFallback() throws Exception { + RowType rowType = + RowType.builder() + .field("id", DataTypes.INT()) + .field("event", DataTypes.STRING()) + .field("price", DataTypes.BIGINT()) + .build(); + // in unaware-bucket mode, we split files into splits all the time + FileStoreTable table = + createUnawareBucketFileStoreTable( + rowType, + options -> { + options.set(METADATA_STATS_MODE, "NONE"); + options.set(WRITE_ONLY, true); + options.set(FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "1 B"); + }); + + // write some records without file index + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + write.write(GenericRow.of(1, BinaryString.fromString("A"), 1L)); + write.write(GenericRow.of(1, BinaryString.fromString("B"), 2L)); + write.write(GenericRow.of(1, BinaryString.fromString("C"), 3L)); + commit.commit(0, write.prepareCommit(true, 0)); + write.close(); + + // add bitmap index and write some records + Map properties = new HashMap<>(); + properties.put( + FileIndexOptions.FILE_INDEX + + "." + + BitmapFileIndexFactory.BITMAP_INDEX + + "." + + CoreOptions.COLUMNS, + "event"); + table = table.copy(properties); + write = table.newWrite(commitUser); + write.write(GenericRow.of(1, BinaryString.fromString("A"), 1L)); + write.write(GenericRow.of(1, BinaryString.fromString("B"), 2L)); + write.write(GenericRow.of(1, BinaryString.fromString("C"), 3L)); + commit.commit(1, write.prepareCommit(true, 1)); + write.close(); + + // add bsi index and write some records + properties.put( + FileIndexOptions.FILE_INDEX + + "." + + BitSliceIndexBitmapFileIndexFactory.BSI_INDEX + + "." + + CoreOptions.COLUMNS, + "price"); + table = table.copy(properties); + write = table.newWrite(commitUser); + write.write(GenericRow.of(1, BinaryString.fromString("A"), 1L)); + write.write(GenericRow.of(1, BinaryString.fromString("B"), 2L)); + write.write(GenericRow.of(1, BinaryString.fromString("C"), 3L)); + commit.commit(2, write.prepareCommit(true, 2)); + write.close(); + commit.close(); + + Function TO_STRING = + row -> row.getInt(0) + "," + row.getString(1).toString() + "," + row.getLong(2); + Predicate predicate = + PredicateBuilder.and( + new PredicateBuilder(rowType).equal(1, BinaryString.fromString("A")), + new PredicateBuilder(rowType).equal(2, 1L)); + TableScan.Plan plan = table.newScan().withFilter(predicate).plan(); + + // test read without index filter, should not fallback + List a1 = new ArrayList<>(); + RecordReader r1 = + table.newRead().withFilter(predicate).createReader(plan.splits()); + r1.forEachRemaining(row -> a1.add(TO_STRING.apply(row))); + assertThat(a1.size()).isEqualTo(5); + assertThat(String.join("|", a1)).isEqualTo("1,A,1|1,B,2|1,C,3|1,A,1|1,A,1"); + + // test read with index filter, should fallback + List a2 = new ArrayList<>(); + RecordReader r2 = + table.newRead() + .withFilter(predicate) + .withIndexFilter(predicate) + .createReader(plan.splits()); + r2.forEachRemaining(row -> a2.add(TO_STRING.apply(row))); + assertThat(a2.size()).isEqualTo(3); + assertThat(String.join("|", a2)).isEqualTo("1,A,1|1,A,1|1,A,1"); + } + @Test public void testWithShardAppendTable() throws Exception { FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, -1)); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 51c8b328dfc6a..636f9563ad408 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -27,6 +27,8 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.disk.IOManagerImpl; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.fileindex.bitmap.BitmapFileIndexFactory; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.BundleRecords; @@ -932,6 +934,74 @@ public void testDeletionVectorsWithBitmapFileIndexInMeta() throws Exception { "1|5|100|binary|varbinary|mapKey:mapVal|multiset")); } + @Test + public void testDeletionVectorsWithFileIndexFilterPushDownFallback() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(BUCKET, 1); + conf.set(DELETION_VECTORS_ENABLED, true); + conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1)); + conf.set(FILE_INDEX_IN_MANIFEST_THRESHOLD, MemorySize.ofBytes(1)); + }); + + StreamTableWrite write = + table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString())); + StreamTableCommit commit = table.newCommit(commitUser); + + // write some records without index + write.write(rowData(1, 1, 300L)); + write.write(rowData(1, 2, 400L)); + write.write(rowData(1, 3, 100L)); + write.write(rowData(1, 6, 300L)); + commit.commit(0, write.prepareCommit(true, 0)); + write.close(); + + // add bitmap index and write some records + Map properties = new HashMap<>(); + properties.put( + FileIndexOptions.FILE_INDEX + + "." + + BitmapFileIndexFactory.BITMAP_INDEX + + "." + + CoreOptions.COLUMNS, + "b"); + table = table.copy(properties); + write = table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString())); + write.write(rowData(1, 5, 300L)); + write.write(rowData(1, 7, 200L)); + commit.commit(1, write.prepareCommit(true, 1)); + + write.write(rowData(1, 5, 100L)); + commit.commit(2, write.prepareCommit(true, 2)); + write.close(); + commit.close(); + + Predicate predicate = new PredicateBuilder(ROW_TYPE).equal(2, 100L); + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + + // test read without index filter, should not fallback + TableRead r1 = table.newRead().withFilter(predicate); + assertThat(getResult(r1, splits, BATCH_ROW_TO_STRING)) + .hasSameElementsAs( + Arrays.asList( + "1|1|100|binary|varbinary|mapKey:mapVal|multiset", + "1|2|100|binary|varbinary|mapKey:mapVal|multiset", + "1|5|100|binary|varbinary|mapKey:mapVal|multiset", + "1|4|100|binary|varbinary|mapKey:mapVal|multiset", + "1|6|300|binary|varbinary|mapKey:mapVal|multiset")); + + // test read with index filter, should fallback + TableRead r2 = table.newRead().withFilter(predicate).withIndexFilter(predicate); + assertThat(getResult(r2, splits, BATCH_ROW_TO_STRING)) + .hasSameElementsAs( + Arrays.asList( + "1|1|100|binary|varbinary|mapKey:mapVal|multiset", + "1|2|100|binary|varbinary|mapKey:mapVal|multiset", + "1|5|100|binary|varbinary|mapKey:mapVal|multiset", + "1|4|100|binary|varbinary|mapKey:mapVal|multiset")); + } + @Test public void testWithShardFirstRow() throws Exception { FileStoreTable table = diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index ee00d41832cda..09720de445122 100644 --- a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -51,6 +51,7 @@ public DataTableSource( null, null, null, + null, false); } @@ -61,6 +62,7 @@ public DataTableSource( DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, + @Nullable Predicate indexPredicate, @Nullable int[][] projectFields, @Nullable Long limit, @Nullable WatermarkStrategy watermarkStrategy, @@ -72,6 +74,7 @@ public DataTableSource( context, logStoreTableFactory, predicate, + indexPredicate, projectFields, limit, watermarkStrategy, @@ -87,6 +90,7 @@ public DataTableSource copy() { context, logStoreTableFactory, predicate, + indexPredicate, projectFields, limit, watermarkStrategy, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java index 5dbbdcedd82ac..a75c2b0ec022a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java @@ -107,6 +107,7 @@ public BaseDataTableSource( DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, + @Nullable Predicate indexPredicate, @Nullable int[][] projectFields, @Nullable Long limit, @Nullable WatermarkStrategy watermarkStrategy, @@ -117,6 +118,7 @@ public BaseDataTableSource( this.context = context; this.logStoreTableFactory = logStoreTableFactory; this.predicate = predicate; + this.indexPredicate = indexPredicate; this.projectFields = projectFields; this.limit = limit; this.watermarkStrategy = watermarkStrategy; @@ -198,6 +200,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .logSourceProvider(logSourceProvider) .projection(projectFields) .predicate(predicate) + .indexPredicate(indexPredicate) .limit(limit) .watermarkStrategy(watermarkStrategy) .dynamicPartitionFilteringFields(dynamicPartitionFilteringFields()); @@ -324,6 +327,11 @@ public boolean applyAggregates( return false; } + // todo: support apply aggregate by indexes + if (indexPredicate != null) { + return false; + } + if (!aggregateExpressions .get(0) .getFunctionDefinition() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index 53a1b5f630831..4721b91e5ad29 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -70,6 +70,7 @@ public DataTableSource( null, null, null, + null, false); } @@ -80,6 +81,7 @@ public DataTableSource( DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, + @Nullable Predicate indexPredicate, @Nullable int[][] projectFields, @Nullable Long limit, @Nullable WatermarkStrategy watermarkStrategy, @@ -92,6 +94,7 @@ public DataTableSource( context, logStoreTableFactory, predicate, + indexPredicate, projectFields, limit, watermarkStrategy, @@ -108,6 +111,7 @@ public DataTableSource copy() { context, logStoreTableFactory, predicate, + indexPredicate, projectFields, limit, watermarkStrategy, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index b3dcd4840cc1b..87b6fecdf5db1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -86,6 +86,7 @@ public class FlinkSourceBuilder { private StreamExecutionEnvironment env; @Nullable private int[][] projectedFields; @Nullable private Predicate predicate; + @Nullable private Predicate indexPredicate; @Nullable private LogSourceProvider logSourceProvider; @Nullable private Integer parallelism; @Nullable private Long limit; @@ -134,6 +135,11 @@ public FlinkSourceBuilder predicate(Predicate predicate) { return this; } + public FlinkSourceBuilder indexPredicate(Predicate indexPredicate) { + this.indexPredicate = indexPredicate; + return this; + } + public FlinkSourceBuilder limit(@Nullable Long limit) { this.limit = limit; return this; @@ -178,6 +184,9 @@ private ReadBuilder createReadBuilder() { if (limit != null) { readBuilder.withLimit(limit.intValue()); } + if (indexPredicate != null) { + readBuilder.withIndexFilter(indexPredicate); + } return readBuilder.dropStats(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index c31824180e104..be341f132bc4e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -19,15 +19,12 @@ package org.apache.paimon.flink.source; import org.apache.paimon.CoreOptions; -import org.apache.paimon.fileindex.FileIndexFilterPushDownAnalyzer; -import org.apache.paimon.fileindex.FileIndexOptions; -import org.apache.paimon.fileindex.FileIndexOptions.Column; +import org.apache.paimon.fileindex.FileIndexFilterPushDownVisitor; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.flink.PredicateConverter; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.Options; -import org.apache.paimon.predicate.FileIndexPredicateVisitor; import org.apache.paimon.predicate.PartitionPredicateVisitor; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; @@ -51,9 +48,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX; @@ -74,6 +69,7 @@ public abstract class FlinkTableSource protected final Table table; @Nullable protected Predicate predicate; + @Nullable protected Predicate indexPredicate; @Nullable protected int[][] projectFields; @Nullable protected Long limit; protected SplitStatistics splitStatistics; @@ -98,23 +94,20 @@ public Result applyFilters(List filters) { List partitionKeys = table.partitionKeys(); RowType rowType = LogicalTypeConversion.toLogicalType(table.rowType()); - CoreOptions options = CoreOptions.fromMap(table.options()); - boolean fileIndexReadEnabled = options.fileIndexReadEnabled(); - Map> fileIndexAnalyzers = Collections.emptyMap(); - if (fileIndexReadEnabled) { - FileIndexOptions fileIndexOptions = options.indexColumnsOptions(); - fileIndexAnalyzers = fileIndexOptions.getFileIndexFilterPushDownAnalyzer(table.rowType()); - } - // The source must ensure the consumed filters are fully evaluated, otherwise the result // of query will be wrong. List unConsumedFilters = new ArrayList<>(); List consumedFilters = new ArrayList<>(); + List indexConverted = new ArrayList<>(); List converted = new ArrayList<>(); PredicateVisitor onlyPartFieldsVisitor = new PartitionPredicateVisitor(partitionKeys); - FileIndexPredicateVisitor onlyFileIndexFieldsVisitor = - new FileIndexPredicateVisitor(fileIndexAnalyzers); + + // todo: merge-tree reader? + CoreOptions options = CoreOptions.fromMap(table.options()); + boolean fileIndexReadEnabled = options.fileIndexReadEnabled(); + FileIndexFilterPushDownVisitor indexPushDownVisitor = + table.fileIndexFilterPushDownVisitor(); for (ResolvedExpression filter : filters) { Optional predicateOptional = PredicateConverter.convert(rowType, filter); @@ -125,9 +118,11 @@ public Result applyFilters(List filters) { Predicate p = predicateOptional.get(); if (isStreaming()) { unConsumedFilters.add(filter); - } else if (p.visit(onlyPartFieldsVisitor) - || (fileIndexReadEnabled && p.visit(onlyFileIndexFieldsVisitor))) { + } else if (p.visit(onlyPartFieldsVisitor)) { + consumedFilters.add(filter); + } else if (fileIndexReadEnabled && p.visit(indexPushDownVisitor)) { consumedFilters.add(filter); + indexConverted.add(p); } else { unConsumedFilters.add(filter); } @@ -135,6 +130,7 @@ public Result applyFilters(List filters) { } } predicate = converted.isEmpty() ? null : PredicateBuilder.and(converted); + indexPredicate = indexConverted.isEmpty() ? null : PredicateBuilder.and(indexConverted); LOG.info("Consumed filters: {} of {}", consumedFilters, filters); return Result.of(filters, unConsumedFilters); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java index fa35b236cbab0..f5a4fa792d182 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java @@ -44,13 +44,13 @@ public class FilterPushDownITCase extends CatalogITCaseBase { @Override public List ddl() { return ImmutableList.of( - "CREATE TABLE T (a INT, b INT, c STRING)" + - " PARTITIONED BY (a)" + - " WITH (" + - "'file-index.bitmap.columns'='b'," + - "'file-index.bloom-filter.columns'='c'," + - "'file-index.read.enabled'='false'" + - ");"); + "CREATE TABLE T (a INT, b INT, c STRING)" + + " PARTITIONED BY (a)" + + " WITH (" + + "'file-index.bitmap.columns'='b'," + + "'file-index.bloom-filter.columns'='c'," + + "'file-index.read.enabled'='false'" + + ");"); } @BeforeEach @@ -74,8 +74,8 @@ public void testPartitionConditionConsuming_OnePartitionCondition() { public void testPartitionAndIndexConditionConsuming() { // enable file index read tEnv.executeSql("ALTER TABLE T SET('file-index.read.enabled'='true')"); - - // test bitmap index push down + + // bitmap index push down String s1 = "SELECT * FROM T where a = 1 AND b = 1 limit 1"; assertPlanAndResult( s1, @@ -93,22 +93,31 @@ public void testPartitionAndIndexConditionConsuming() { Row.ofKind(RowKind.INSERT, 1, 1, "1"), Row.ofKind(RowKind.INSERT, 1, 2, "2")); - // test bloom-filter index filter push down + // bloom-filter index does not support filter push down String s3 = "SELECT * FROM T where a = 1 AND c = '1' limit 1"; assertPlanAndResult( s3, - "+- Limit(offset=[0], fetch=[1], global=[false])\n" + - "+- Calc(select=[CAST(1 AS INTEGER) AS a, b, CAST('1' AS VARCHAR(2147483647)) AS c], where=[=(c, '1')])\n" + - "+- TableSourceScan(table=[[PAIMON, default, T, filter=[and(=(a, 1), =(c, _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], project=[b, c]]], fields=[b, c])", + "+- Limit(offset=[0], fetch=[1], global=[false])\n" + + "+- Calc(select=[CAST(1 AS INTEGER) AS a, b, CAST('1' AS VARCHAR(2147483647)) AS c], where=[=(c, '1')])\n" + + "+- TableSourceScan(table=[[PAIMON, default, T, filter=[and(=(a, 1), =(c, _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], project=[b, c]]], fields=[b, c])", Row.ofKind(RowKind.INSERT, 1, 1, "1")); // test bitmap and bloom-filter combine String s4 = "SELECT * FROM T where a = 1 AND b = 1 AND c = '1' limit 1"; assertPlanAndResult( s4, - "+- Limit(offset=[0], fetch=[1], global=[false])\n" + - "+- Calc(select=[CAST(1 AS INTEGER) AS a, CAST(1 AS INTEGER) AS b, CAST('1' AS VARCHAR(2147483647)) AS c], where=[=(c, '1')])\n" + - "+- TableSourceScan(table=[[PAIMON, default, T, filter=[and(and(=(a, 1), =(b, 1)), =(c, _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], project=[c]]], fields=[c])", + "+- Limit(offset=[0], fetch=[1], global=[false])\n" + + "+- Calc(select=[CAST(1 AS INTEGER) AS a, CAST(1 AS INTEGER) AS b, CAST('1' AS VARCHAR(2147483647)) AS c], where=[(c = '1')])\n" + + "+- TableSourceScan(table=[[PAIMON, default, T, filter=[and(and(=(a, 1), =(b, 1)), =(c, _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], project=[c]]], fields=[c])", + Row.ofKind(RowKind.INSERT, 1, 1, "1")); + + // test bitmap and bloom-filter combine without partition + String s5 = "SELECT * FROM T where b = 1 AND c = '1' limit 1"; + assertPlanAndResult( + s5, + "+- Limit(offset=[0], fetch=[1], global=[false])\n" + + "+- Calc(select=[a, CAST(1 AS INTEGER) AS b, CAST('1' AS VARCHAR(2147483647)) AS c], where=[(c = '1')])\n" + + "+- TableSourceScan(table=[[PAIMON, default, T, filter=[and(=(b, 1), =(c, _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], project=[a, c]]], fields=[a, c])", Row.ofKind(RowKind.INSERT, 1, 1, "1")); // disable file index read diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java index 826bf28d12484..30331228ca21d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java @@ -153,6 +153,7 @@ public void testTableFilterPartitionStatistics() throws Exception { null, null, null, + null, false); Assertions.assertThat(partitionFilterSource.reportStatistics().getRowCount()).isEqualTo(5L); Map> colStatsMap = new HashMap<>(); @@ -232,6 +233,7 @@ public void testTableFilterKeyStatistics() throws Exception { null, null, null, + null, false); Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(2L); Map> colStatsMap = new HashMap<>(); @@ -311,6 +313,7 @@ public void testTableFilterValueStatistics() throws Exception { null, null, null, + null, false); Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(4L); Map> colStatsMap = new HashMap<>(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java index f5d4121672b0a..a6e3d3f26aa94 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java @@ -52,6 +52,7 @@ public void testTableFilterValueStatistics() throws Exception { null, null, null, + null, false); Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(9L); // TODO validate column statistics