Skip to content

Commit

Permalink
add benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
Tan-JiaLiang committed Jan 3, 2025
1 parent 985a58a commit af5fa6a
Show file tree
Hide file tree
Showing 9 changed files with 542 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.benchmark.deletionvectors;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.benchmark.Benchmark;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;

import org.apache.commons.math3.random.RandomDataGenerator;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/** Benchmark for deletion vector push down table read. */
public class DeletionVectorsPushDownBenchmark {

private static final int VALUE_COUNT = 10;

private final int rowCount = 100000;
java.nio.file.Path tempFile = new File("D:\\paimon").toPath();

private final RandomDataGenerator random = new RandomDataGenerator();

@Test
public void testParquetRead() throws Exception {
System.out.println(tempFile);
int[] bounds = new int[] {100000, 500000, 100000000};
for (int bound : bounds) {
// Table table = prepareData(bound, parquet(), "parquet_" + bound);
Table table = getTable("parquet_" + bound);
Map<String, Table> tables = new LinkedHashMap<>();
tables.put(
"without-dv-push-down",
table.copy(Collections.singletonMap("deletion-vectors.push-down", "false")));
tables.put(
"with-dv-push-down",
table.copy(Collections.singletonMap("deletion-vectors.push-down", "true")));

int[] values = new int[5];
for (int i = 0; i < values.length; i++) {
values[i] = random.nextInt(0, bound);
}
innerTest(tables, bound, values);
}

// test select count is matching or not

// test eq result is matching or not

// new File(tempFile.toString()).delete();
}

private Options parquet() {
Options options = new Options();
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.WRITE_BUFFER_SIZE, MemorySize.parse("1 GB"));
options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_PARQUET);
options.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
options.set(CoreOptions.SNAPSHOT_TIME_RETAINED, Duration.ofMinutes(1));
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 6);
return options;
}

private void innerTest(Map<String, Table> tables, int bound, int[] values) {
int readTime = 3;
Benchmark benchmark =
new Benchmark("read", readTime * rowCount)
.setNumWarmupIters(1)
.setOutputPerIteration(true);

for (String name : tables.keySet()) {
for (int value : values) {
benchmark.addCase(
"read-" + name + "-" + bound + "-" + value,
3,
() -> {
Table table = tables.get(name);
Predicate predicate =
new PredicateBuilder(table.rowType()).equal(0, value);
for (int i = 0; i < readTime; i++) {
List<Split> splits =
table.newReadBuilder().newScan().plan().splits();
AtomicLong readCount = new AtomicLong(0);
try {
for (Split split : splits) {
RecordReader<InternalRow> reader =
table.newReadBuilder()
.withFilter(predicate)
.newRead()
.createReader(split);
reader.forEachRemaining(row -> readCount.incrementAndGet());
}
System.out.printf("Finish read %d rows.\n", readCount.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
}
}
benchmark.run();
}

private Table prepareData(int bound, Options options, String tableName) throws Exception {
Table table = createTable(options, tableName);
StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
StreamTableWrite write =
(StreamTableWrite)
writeBuilder
.newWrite()
.withIOManager(new IOManagerImpl(tempFile.toString()));
StreamTableCommit commit = writeBuilder.newCommit();

for (int i = 0; i < 30; i++) {
for (int j = 0; j < rowCount; j++) {
try {
write.write(newRandomRow(bound));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
List<CommitMessage> commitMessages = write.prepareCommit(true, i);
commit.commit(i, commitMessages);
}

write.close();
commit.close();
return table;
}

protected Table createTable(Options tableOptions, String tableName) throws Exception {
Options catalogOptions = new Options();
catalogOptions.set(CatalogOptions.WAREHOUSE, tempFile.toUri().toString());
Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
String database = "default";
catalog.createDatabase(database, true);

List<DataField> fields = new ArrayList<>();
fields.add(new DataField(0, "k", new IntType()));
for (int i = 1; i <= VALUE_COUNT; i++) {
fields.add(new DataField(i, "f" + i, DataTypes.STRING()));
}
Schema schema =
new Schema(
fields,
Collections.emptyList(),
Collections.singletonList("k"),
tableOptions.toMap(),
"");
Identifier identifier = Identifier.create(database, tableName);
catalog.createTable(identifier, schema, false);
return catalog.getTable(identifier);
}

protected Table getTable(String tableName) throws Catalog.TableNotExistException {
Options catalogOptions = new Options();
catalogOptions.set(CatalogOptions.WAREHOUSE, tempFile.toUri().toString());
Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
return catalog.getTable(Identifier.create("default", tableName));
}

protected InternalRow newRandomRow(int bound) {
GenericRow row = new GenericRow(1 + VALUE_COUNT);
row.setField(0, random.nextInt(0, bound));
for (int i = 1; i <= VALUE_COUNT; i++) {
row.setField(i, BinaryString.fromString(random.nextHexString(10)));
}
return row;
}
}
Loading

0 comments on commit af5fa6a

Please sign in to comment.