From cad67f202ba9addfeedb927d9847acf07041c11c Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Mon, 2 Dec 2024 20:34:40 +0800 Subject: [PATCH] fix --- .../datasource/hive/HMSExternalTable.java | 19 ++++ .../hive/HiveMetaStoreClientHelper.java | 5 +- .../datasource/hudi/HudiSchemaCacheValue.java | 40 +++++++ .../doris/datasource/hudi/HudiUtils.java | 9 ++ .../datasource/hudi/source/HudiScanNode.java | 101 ++++++++++++++---- 5 files changed, 150 insertions(+), 24 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 1c30fa24cfb51e7..52f7779f2c5f686 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -114,6 +115,10 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI private static final String USE_HIVE_SYNC_PARTITION = "use_hive_sync_partition"; + private HoodieTableMetaClient hudiClient = null; + private final byte[] hudiClientLock = new byte[0]; + + static { SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet(); SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"); @@ -986,4 +991,18 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { Env.getCurrentEnv().getRefreshManager() .refreshTable(getCatalog().getName(), getDbName(), getName(), true); } + + public HoodieTableMetaClient getHudiClient() { + if (hudiClient != null) { + return hudiClient; + } + synchronized (hudiClientLock) { + if (hudiClient != null) { + return hudiClient; + } + hudiClient = HudiUtils.buildHudiTableMetaClient( + getRemoteTable().getSd().getLocation(), catalog.getConfiguration()); + return hudiClient; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index 97032467cec765f..21e284685fa81e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -826,10 +826,7 @@ public static T ugiDoAs(Configuration conf, PrivilegedExceptionAction act } public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) { - String hudiBasePath = table.getRemoteTable().getSd().getLocation(); - Configuration conf = getConfiguration(table); - return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), - () -> HoodieTableMetaClient.builder().setConf(conf).setBasePath(hudiBasePath).build()); + return table.getHudiClient(); } public static Configuration getConfiguration(HMSExternalTable table) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java new file mode 100644 index 000000000000000..8c58ffa2006f166 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheValue.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.hive.HMSSchemaCacheValue; + +import java.util.List; + +public class HudiSchemaCacheValue extends HMSSchemaCacheValue { + + private List colTypes; + + public HudiSchemaCacheValue(List schema, List partitionColumns) { + super(schema, partitionColumns); + } + + public List getColTypes() { + return colTypes; + } + + public void setColTypes(List colTypes) { + this.colTypes = colTypes; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java index d7803b1a516f9e8..2481ca7d10234bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java @@ -23,11 +23,15 @@ import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopUGI; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; @@ -231,4 +235,9 @@ private static Type handleUnionType(Schema avroSchema) { } return Type.UNSUPPORTED; } + + public static HoodieTableMetaClient buildHudiTableMetaClient(String hudiBasePath, Configuration conf) { + return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), + () -> HoodieTableMetaClient.builder().setConf(conf).setBasePath(hudiBasePath).build()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index a8f2a362bfde8d0..d5700ec5daeedab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -20,20 +20,23 @@ import org.apache.doris.analysis.TableScanParams; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileSplit; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.TablePartitionValues; import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.source.HiveScanNode; -import org.apache.doris.datasource.hudi.HudiUtils; +import org.apache.doris.datasource.hudi.HudiSchemaCacheValue; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; @@ -47,21 +50,23 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.avro.Schema; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewManager; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -71,7 +76,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -82,6 +86,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.Stream; public class HudiScanNode extends HiveScanNode { @@ -113,6 +118,7 @@ public class HudiScanNode extends HiveScanNode { private boolean incrementalRead = false; private TableScanParams scanParams; private IncrementalRelation incrementalRelation; + private HoodieTableFileSystemView partitionFileSystemView; /** * External file scan node for Query Hudi table @@ -165,20 +171,11 @@ protected void doInitialize() throws UserException { basePath = hmsTable.getRemoteTable().getSd().getLocation(); inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); serdeLib = hmsTable.getRemoteTable().getSd().getSerdeInfo().getSerializationLib(); - columnNames = new ArrayList<>(); - columnTypes = new ArrayList<>(); - TableSchemaResolver schemaUtil = new TableSchemaResolver(hudiClient); - Schema hudiSchema; - try { - hudiSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema()); - } catch (Exception e) { - throw new UserException("Cannot get hudi table schema."); - } - for (Schema.Field hudiField : hudiSchema.getFields()) { - columnNames.add(hudiField.name().toLowerCase(Locale.ROOT)); - String columnType = HudiUtils.convertAvroToHiveType(hudiField.schema()); - columnTypes.add(columnType); - } + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(table.getCatalog()); + Optional schemaCacheValue = cache.getSchemaValue(table.getDbName(), table.getName()); + HudiSchemaCacheValue hudiSchemaCacheValue = (HudiSchemaCacheValue)schemaCacheValue.get(); + columnNames = hudiSchemaCacheValue.getSchema().stream().map(Column::getName).collect(Collectors.toList()); + columnTypes = hudiSchemaCacheValue.getColTypes(); if (scanParams != null && !scanParams.incrementalRead()) { // Only support incremental read @@ -221,8 +218,32 @@ protected void doInitialize() throws UserException { queryInstant = snapshotInstant.get().getTimestamp(); snapshotTimestamp = Option.empty(); } + partitionFileSystemView = getFsView(); + } + + private HoodieTableFileSystemView getFsView() { + HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder() + .enable(HoodieTableMetadataUtil.isFilesPartitionAvailable(hudiClient)) + .build(); + FileSystemViewStorageConfig viewConf = FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.MEMORY) + .withIncrementalTimelineSync(false) + .build(); + FileSystemViewManager viewMgr = FileSystemViewManager.createViewManagerWithTableMetadata( + new HudiLocalEngineContext(hudiClient.getHadoopConf()), + metadataConfig, + viewConf, + null + ); + return (HoodieTableFileSystemView) viewMgr.getFileSystemView(hudiClient); } +// private void getHudiTable() { +// HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); +// HoodieTable hoodieTable = HoodieJavaTable.create(clientConfig, context, metaClient); +// HoodieIndexUtils. +// } + @Override protected Map getLocationProperties() throws UserException { if (incrementalRead) { @@ -328,7 +349,7 @@ private List getIncrementalSplits() { incrementalRelation.getEndTs())).collect(Collectors.toList()); } - private void getPartitionSplits(HivePartition partition, List splits) throws IOException { + private void getPartitionSplits2(HivePartition partition, List splits) throws IOException { String globPath; String partitionName; if (partition.isDummyPartition()) { @@ -361,6 +382,46 @@ private void getPartitionSplits(HivePartition partition, List splits) thr } } + private void getPartitionSplits(HivePartition partition, List splits) throws IOException { + + if (isCowOrRoTable) { + Stream latestBaseFilesBeforeOrOn; + if (partition.isDummyPartition()) { + Map> allLatestBaseFilesBeforeOrOn = partitionFileSystemView.getAllLatestBaseFilesBeforeOrOn(queryInstant); + latestBaseFilesBeforeOrOn = allLatestBaseFilesBeforeOrOn.get(""); + } else { + String partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), + new Path(partition.getPath())); + latestBaseFilesBeforeOrOn = partitionFileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant); + } + latestBaseFilesBeforeOrOn.forEach(baseFile -> { + noLogsSplitNum.incrementAndGet(); + String filePath = baseFile.getPath(); + long fileSize = baseFile.getFileSize(); + // Need add hdfs host to location + LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties()); + splits.add(new FileSplit(locationPath, 0, fileSize, fileSize, 0, + new String[0], partition.getPartitionValues())); + }); + } else { + Stream fileSliceStream; + + if (partition.isDummyPartition()) { + Map> allLatestFileSlicesBeforeOrOn = partitionFileSystemView.getAllLatestFileSlicesBeforeOrOn(queryInstant); + fileSliceStream = allLatestFileSlicesBeforeOrOn.get(""); + } else { + String partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), + new Path(partition.getPath())); + fileSliceStream = partitionFileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant); + } + + fileSliceStream + .forEach(fileSlice -> splits.add( + generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant))); + } + } + + private void getPartitionsSplits(List partitions, List splits) { Executor executor = Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor(); CountDownLatch countDownLatch = new CountDownLatch(partitions.size());