Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwenchi committed Feb 12, 2025
1 parent bf96eca commit fda6876
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -145,6 +146,24 @@ public org.apache.paimon.table.Table getPaimonTable(String dbName, String tblNam
}
}

public List<Partition> getPaimonPartitions(String dbName, String tblName) {
makeSureInitialized();
try {
return hadoopAuthenticator.doAs(() -> {
List<Partition> partitions = new ArrayList<>();
try {
partitions = catalog.listPartitions(Identifier.create(dbName, tblName));
} catch (Catalog.TableNotExistException e) {
LOG.warn("TableNotExistException", e);
}
return partitions;
});
} catch (IOException e) {
throw new RuntimeException("Failed to get Paimon table partitions:" + getName() + "."
+ dbName + "." + tblName + ", because " + e.getMessage(), e);
}
}

protected String getPaimonCatalogType(String catalogType) {
if (PAIMON_HMS.equalsIgnoreCase(catalogType)) {
return PaimonProperties.PAIMON_HMS_CATALOG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.system.SchemasTable;
import org.apache.paimon.types.DataField;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -93,9 +90,13 @@ protected synchronized void makeSureInitialized() {
}

public Table getPaimonTable(Optional<MvccSnapshot> snapshot) {
long snapshotId = getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId();
if (snapshotId == PaimonSnapshot.INVALID_SNAPSHOT_ID) {
return paimonTable;
}
return paimonTable.copy(
Collections.singletonMap(CoreOptions.SCAN_VERSION.key(),
String.valueOf(getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId())));
String.valueOf(snapshotId)));
}

public PaimonSchemaCacheValue getPaimonSchemaCacheValue(long schemaId) {
Expand Down Expand Up @@ -194,12 +195,12 @@ private boolean isPartitionInvalid(Optional<MvccSnapshot> snapshot) {
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
throws AnalysisException {
PaimonPartition paimonPartition = getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartition()
Partition paimonPartition = getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartition()
.get(partitionName);
if (paimonPartition == null) {
throw new AnalysisException("can not find partition: " + partitionName);
}
return new MTMVTimestampSnapshot(paimonPartition.getLastUpdateTime());
return new MTMVTimestampSnapshot(paimonPartition.lastFileCreationTime());
}

@Override
Expand Down Expand Up @@ -244,10 +245,11 @@ public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
makeSureInitialized();
PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key;
try {
PaimonSchema schema = loadPaimonSchemaBySchemaId(paimonSchemaCacheKey);
List<DataField> columns = schema.getFields();
Table table = ((PaimonExternalCatalog) getCatalog()).getPaimonTable(key.getDbName(), name);
TableSchema tableSchema = ((DataTable) table).schemaManager().schema(paimonSchemaCacheKey.getSchemaId());
List<DataField> columns = tableSchema.fields();
List<Column> dorisColumns = Lists.newArrayListWithCapacity(columns.size());
Set<String> partitionColumnNames = Sets.newHashSet(schema.getPartitionKeys());
Set<String> partitionColumnNames = Sets.newHashSet(tableSchema.partitionKeys());
List<Column> partitionColumns = Lists.newArrayList();
for (DataField field : columns) {
Column column = new Column(field.name().toLowerCase(),
Expand All @@ -267,23 +269,6 @@ null, getCatalog().getName(), key.getDbName(), key.getTblName(),

}

private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) throws IOException {
Table table = ((PaimonExternalCatalog) getCatalog()).getPaimonTable(key.getDbName(),
name + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS);
PredicateBuilder builder = new PredicateBuilder(table.rowType());
Predicate predicate = builder.equal(0, key.getSchemaId());
// Adding predicates will also return excess data
List<InternalRow> rows = PaimonUtil.read(table, new int[] {0, 1, 2}, predicate);
for (InternalRow row : rows) {
PaimonSchema schema = PaimonUtil.rowToSchema(row);
if (schema.getSchemaId() == key.getSchemaId()) {
return schema;
}
}
throw new CacheException("failed to initSchema for: %s.%s.%s.%s",
null, getCatalog().getName(), key.getDbName(), key.getTblName(), key.getSchemaId());
}

private PaimonSchemaCacheValue getPaimonSchemaCacheValue(Optional<MvccSnapshot> snapshot) {
PaimonSnapshotCacheValue snapshotCacheValue = getOrFetchSnapshotCacheValue(snapshot);
return getPaimonSchemaCacheValue(snapshotCacheValue.getSnapshot().getSchemaId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,10 @@
import org.apache.doris.datasource.ExternalMetaCacheMgr;

import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.PartitionsTable;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
Expand Down Expand Up @@ -76,26 +73,15 @@ private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key, List<C
if (CollectionUtils.isEmpty(partitionColumns)) {
return new PaimonPartitionInfo();
}
List<PaimonPartition> paimonPartitions = loadPartitions(key);
List<Partition> paimonPartitions = ((PaimonExternalCatalog) key.getCatalog())
.getPaimonPartitions(key.getDbName(), key.getTableName());
return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions);
}

private List<PaimonPartition> loadPartitions(PaimonSnapshotCacheKey key)
throws IOException {
Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(),
key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS);
List<InternalRow> rows = PaimonUtil.read(table, null, null);
List<PaimonPartition> res = Lists.newArrayListWithCapacity(rows.size());
for (InternalRow row : rows) {
res.add(PaimonUtil.rowToPartition(row));
}
return res;
}

private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) throws IOException {
Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), key.getTableName());
// snapshotId and schemaId
long latestSnapshotId = 0L;
long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
long latestSchemaId = 0L;
OptionalLong optionalSnapshotId = table.latestSnapshotId();
if (optionalSnapshotId.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@
import org.apache.doris.catalog.PartitionItem;

import com.google.common.collect.Maps;
import org.apache.paimon.partition.Partition;

import java.util.Map;

public class PaimonPartitionInfo {
private final Map<String, PartitionItem> nameToPartitionItem;
private final Map<String, PaimonPartition> nameToPartition;
private final Map<String, Partition> nameToPartition;

public PaimonPartitionInfo() {
this.nameToPartitionItem = Maps.newHashMap();
this.nameToPartition = Maps.newHashMap();
}

public PaimonPartitionInfo(Map<String, PartitionItem> nameToPartitionItem,
Map<String, PaimonPartition> nameToPartition) {
Map<String, Partition> nameToPartition) {
this.nameToPartitionItem = nameToPartitionItem;
this.nameToPartition = nameToPartition;
}
Expand All @@ -42,7 +43,7 @@ public Map<String, PartitionItem> getNameToPartitionItem() {
return nameToPartitionItem;
}

public Map<String, PaimonPartition> getNameToPartition() {
public Map<String, Partition> getNameToPartition() {
return nameToPartition;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.datasource.paimon;

public class PaimonSnapshot {
public static long INVALID_SNAPSHOT_ID = -1;
private final long snapshotId;
private final long schemaId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,16 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Projection;

Expand Down Expand Up @@ -91,71 +90,39 @@ public static List<InternalRow> read(
return rows;
}


/*
https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table
+---------------+----------------+--------------------+--------------------+------------------------+
| partition | record_count | file_size_in_bytes| file_count| last_update_time|
+---------------+----------------+--------------------+--------------------+------------------------+
| [1] | 1 | 645 | 1 | 2024-06-24 10:25:57.400|
+---------------+----------------+--------------------+--------------------+------------------------+
org.apache.paimon.table.system.PartitionsTable.TABLE_TYPE
public static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
new DataField(0, "partition", SerializationUtils.newStringType(true)),
new DataField(1, "record_count", new BigIntType(false)),
new DataField(2, "file_size_in_bytes", new BigIntType(false)),
new DataField(3, "file_count", new BigIntType(false)),
new DataField(4, "last_update_time", DataTypes.TIMESTAMP_MILLIS())));
*/
public static PaimonPartition rowToPartition(InternalRow row) {
String partition = row.getString(0).toString();
long recordCount = row.getLong(1);
long fileSizeInBytes = row.getLong(2);
long fileCount = row.getLong(3);
long lastUpdateTime = row.getTimestamp(4, 3).getMillisecond();
return new PaimonPartition(partition, recordCount, fileSizeInBytes, fileCount, lastUpdateTime);
}

public static PaimonPartitionInfo generatePartitionInfo(List<Column> partitionColumns,
List<PaimonPartition> paimonPartitions) {
List<Partition> paimonPartitions) {

Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
Map<String, PaimonPartition> nameToPartition = Maps.newHashMap();
Map<String, Partition> nameToPartition = Maps.newHashMap();
PaimonPartitionInfo partitionInfo = new PaimonPartitionInfo(nameToPartitionItem, nameToPartition);
if (CollectionUtils.isEmpty(partitionColumns)) {
if (CollectionUtils.isEmpty(partitionColumns) || paimonPartitions.isEmpty()) {
return partitionInfo;
}
for (PaimonPartition paimonPartition : paimonPartitions) {
String partitionName = getPartitionName(partitionColumns, paimonPartition.getPartitionValues());
nameToPartition.put(partitionName, paimonPartition);

for (Partition partition : paimonPartitions) {
Map<String, String> spec = partition.spec();
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : spec.entrySet()) {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append("/");
}
if (sb.length() > 0) {
sb.deleteCharAt(sb.length() - 1);
}
String partitionName = sb.toString();
nameToPartition.put(partitionName, partition);
try {
// partition values return by paimon api, may have problem,
// to avoid affecting the query, we catch exceptions here
nameToPartitionItem.put(partitionName, toListPartitionItem(partitionName, partitionColumns));
} catch (Exception e) {
LOG.warn("toListPartitionItem failed, partitionColumns: {}, partitionValues: {}", partitionColumns,
paimonPartition.getPartitionValues(), e);
LOG.warn("toListPartitionItem failed, partitionColumns: {}, partitionValues: {}",
partitionColumns, partition.spec(), e);
}
}
return partitionInfo;
}

private static String getPartitionName(List<Column> partitionColumns, String partitionValueStr) {
Preconditions.checkNotNull(partitionValueStr);
String[] partitionValues = partitionValueStr.replace("[", "").replace("]", "")
.split(",");
Preconditions.checkState(partitionColumns.size() == partitionValues.length);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < partitionColumns.size(); ++i) {
if (i != 0) {
sb.append("/");
}
sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues[i]);
}
return sb.toString();
}

public static ListPartitionItem toListPartitionItem(String partitionName, List<Column> partitionColumns)
throws AnalysisException {
List<Type> types = partitionColumns.stream()
Expand Down Expand Up @@ -253,32 +220,4 @@ private static Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataT
public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) {
return paimonPrimitiveTypeToDorisType(type);
}

/**
* https://paimon.apache.org/docs/0.9/maintenance/system-tables/#schemas-table
* demo:
* 0
* [{"id":0,"name":"user_id","type":"BIGINT NOT NULL"},
* {"id":1,"name":"item_id","type":"BIGINT"},
* {"id":2,"name":"behavior","type":"STRING"},
* {"id":3,"name":"dt","type":"STRING NOT NULL"},
* {"id":4,"name":"hh","type":"STRING NOT NULL"}]
* ["dt"]
* ["dt","hh","user_id"]
* {"owner":"hadoop","provider":"paimon"}
* 2024-12-03 15:38:14.734
*
* @param row
* @return
*/
public static PaimonSchema rowToSchema(InternalRow row) {
long schemaId = row.getLong(0);
String fieldsStr = row.getString(1).toString();
String partitionKeysStr = row.getString(2).toString();
List<DataField> fields = JsonSerdeUtil.fromJson(fieldsStr, new TypeReference<List<DataField>>() {
});
List<String> partitionKeys = JsonSerdeUtil.fromJson(partitionKeysStr, new TypeReference<List<String>>() {
});
return new PaimonSchema(schemaId, fields, partitionKeys);
}
}
Loading

0 comments on commit fda6876

Please sign in to comment.