Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwenchi committed Dec 2, 2024
1 parent 6ce0208 commit cad67f2
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -114,6 +115,10 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI

private static final String USE_HIVE_SYNC_PARTITION = "use_hive_sync_partition";

private HoodieTableMetaClient hudiClient = null;
private final byte[] hudiClientLock = new byte[0];


static {
SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet();
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat");
Expand Down Expand Up @@ -986,4 +991,18 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
Env.getCurrentEnv().getRefreshManager()
.refreshTable(getCatalog().getName(), getDbName(), getName(), true);
}

public HoodieTableMetaClient getHudiClient() {
if (hudiClient != null) {
return hudiClient;
}
synchronized (hudiClientLock) {
if (hudiClient != null) {
return hudiClient;
}
hudiClient = HudiUtils.buildHudiTableMetaClient(
getRemoteTable().getSd().getLocation(), catalog.getConfiguration());
return hudiClient;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -826,10 +826,7 @@ public static <T> T ugiDoAs(Configuration conf, PrivilegedExceptionAction<T> act
}

public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) {
String hudiBasePath = table.getRemoteTable().getSd().getLocation();
Configuration conf = getConfiguration(table);
return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf),
() -> HoodieTableMetaClient.builder().setConf(conf).setBasePath(hudiBasePath).build());
return table.getHudiClient();
}

public static Configuration getConfiguration(HMSExternalTable table) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hudi;

import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.hive.HMSSchemaCacheValue;

import java.util.List;

public class HudiSchemaCacheValue extends HMSSchemaCacheValue {

private List<String> colTypes;

public HudiSchemaCacheValue(List<Column> schema, List<Column> partitionColumns) {
super(schema, partitionColumns);
}

public List<String> getColTypes() {
return colTypes;
}

public void setColTypes(List<String> colTypes) {
this.colTypes = colTypes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopUGI;

import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;

Expand Down Expand Up @@ -231,4 +235,9 @@ private static Type handleUnionType(Schema avroSchema) {
}
return Type.UNSUPPORTED;
}

public static HoodieTableMetaClient buildHudiTableMetaClient(String hudiBasePath, Configuration conf) {
return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf),
() -> HoodieTableMetaClient.builder().setConf(conf).setBasePath(hudiBasePath).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -47,21 +50,23 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -71,7 +76,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -82,6 +86,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class HudiScanNode extends HiveScanNode {

Expand Down Expand Up @@ -113,6 +118,7 @@ public class HudiScanNode extends HiveScanNode {
private boolean incrementalRead = false;
private TableScanParams scanParams;
private IncrementalRelation incrementalRelation;
private HoodieTableFileSystemView partitionFileSystemView;

/**
* External file scan node for Query Hudi table
Expand Down Expand Up @@ -165,20 +171,11 @@ protected void doInitialize() throws UserException {
basePath = hmsTable.getRemoteTable().getSd().getLocation();
inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
serdeLib = hmsTable.getRemoteTable().getSd().getSerdeInfo().getSerializationLib();
columnNames = new ArrayList<>();
columnTypes = new ArrayList<>();
TableSchemaResolver schemaUtil = new TableSchemaResolver(hudiClient);
Schema hudiSchema;
try {
hudiSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema());
} catch (Exception e) {
throw new UserException("Cannot get hudi table schema.");
}
for (Schema.Field hudiField : hudiSchema.getFields()) {
columnNames.add(hudiField.name().toLowerCase(Locale.ROOT));
String columnType = HudiUtils.convertAvroToHiveType(hudiField.schema());
columnTypes.add(columnType);
}
ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(table.getCatalog());
Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(table.getDbName(), table.getName());
HudiSchemaCacheValue hudiSchemaCacheValue = (HudiSchemaCacheValue)schemaCacheValue.get();
columnNames = hudiSchemaCacheValue.getSchema().stream().map(Column::getName).collect(Collectors.toList());
columnTypes = hudiSchemaCacheValue.getColTypes();

if (scanParams != null && !scanParams.incrementalRead()) {
// Only support incremental read
Expand Down Expand Up @@ -221,8 +218,32 @@ protected void doInitialize() throws UserException {
queryInstant = snapshotInstant.get().getTimestamp();
snapshotTimestamp = Option.empty();
}
partitionFileSystemView = getFsView();
}

private HoodieTableFileSystemView getFsView() {
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.enable(HoodieTableMetadataUtil.isFilesPartitionAvailable(hudiClient))
.build();
FileSystemViewStorageConfig viewConf = FileSystemViewStorageConfig.newBuilder()
.withStorageType(FileSystemViewStorageType.MEMORY)
.withIncrementalTimelineSync(false)
.build();
FileSystemViewManager viewMgr = FileSystemViewManager.createViewManagerWithTableMetadata(
new HudiLocalEngineContext(hudiClient.getHadoopConf()),
metadataConfig,
viewConf,
null
);
return (HoodieTableFileSystemView) viewMgr.getFileSystemView(hudiClient);
}

// private void getHudiTable() {
// HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
// HoodieTable hoodieTable = HoodieJavaTable.create(clientConfig, context, metaClient);
// HoodieIndexUtils.
// }

@Override
protected Map<String, String> getLocationProperties() throws UserException {
if (incrementalRead) {
Expand Down Expand Up @@ -328,7 +349,7 @@ private List<Split> getIncrementalSplits() {
incrementalRelation.getEndTs())).collect(Collectors.toList());
}

private void getPartitionSplits(HivePartition partition, List<Split> splits) throws IOException {
private void getPartitionSplits2(HivePartition partition, List<Split> splits) throws IOException {
String globPath;
String partitionName;
if (partition.isDummyPartition()) {
Expand Down Expand Up @@ -361,6 +382,46 @@ private void getPartitionSplits(HivePartition partition, List<Split> splits) thr
}
}

private void getPartitionSplits(HivePartition partition, List<Split> splits) throws IOException {

if (isCowOrRoTable) {
Stream<HoodieBaseFile> latestBaseFilesBeforeOrOn;
if (partition.isDummyPartition()) {
Map<String, Stream<HoodieBaseFile>> allLatestBaseFilesBeforeOrOn = partitionFileSystemView.getAllLatestBaseFilesBeforeOrOn(queryInstant);
latestBaseFilesBeforeOrOn = allLatestBaseFilesBeforeOrOn.get("");
} else {
String partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
new Path(partition.getPath()));
latestBaseFilesBeforeOrOn = partitionFileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant);
}
latestBaseFilesBeforeOrOn.forEach(baseFile -> {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
long fileSize = baseFile.getFileSize();
// Need add hdfs host to location
LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties());
splits.add(new FileSplit(locationPath, 0, fileSize, fileSize, 0,
new String[0], partition.getPartitionValues()));
});
} else {
Stream<FileSlice> fileSliceStream;

if (partition.isDummyPartition()) {
Map<String, Stream<FileSlice>> allLatestFileSlicesBeforeOrOn = partitionFileSystemView.getAllLatestFileSlicesBeforeOrOn(queryInstant);
fileSliceStream = allLatestFileSlicesBeforeOrOn.get("");
} else {
String partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
new Path(partition.getPath()));
fileSliceStream = partitionFileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant);
}

fileSliceStream
.forEach(fileSlice -> splits.add(
generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant)));
}
}


private void getPartitionsSplits(List<HivePartition> partitions, List<Split> splits) {
Executor executor = Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor();
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
Expand Down

0 comments on commit cad67f2

Please sign in to comment.