diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java index 08660906ee72..704798dcacc2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.KeySpec; import org.apache.hudi.common.table.log.block.HoodieDataBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.InternalSchemaCache; @@ -37,24 +36,17 @@ import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieCorruptedDataException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.HoodieKeyException; -import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.action.InternalSchemaMerger; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; import org.apache.avro.Schema; -import org.roaringbitmap.longlong.Roaring64NavigableMap; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Function; import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA; @@ -327,58 +319,4 @@ protected Option merge(Option older, Map olderInfoMap, } return Option.empty(); } - - /** - * Filter a record for downstream processing when: - * 1. A set of pre-specified keys exists. - * 2. The key of the record is not contained in the set. - */ - protected boolean shouldSkip(T record, String keyFieldName, boolean isFullKey, Set keys, Schema dataBlockSchema) { - String recordKey = readerContext.getValue(record, dataBlockSchema, keyFieldName).toString(); - // Can not extract the record key, throw. - if (recordKey == null || recordKey.isEmpty()) { - throw new HoodieKeyException("Can not extract the key for a record"); - } - - // No keys are specified. Cannot skip at all. - if (keys.isEmpty()) { - return false; - } - - // When the record key matches with one of the keys or key prefixes, can not skip. - if ((isFullKey && keys.contains(recordKey)) - || (!isFullKey && keys.stream().anyMatch(recordKey::startsWith))) { - return false; - } - - // Otherwise, this record is not needed. - return true; - } - - /** - * Extract the record positions from a log block header. - * - * @param logBlock - * @return - * @throws IOException - */ - protected static List extractRecordPositions(HoodieLogBlock logBlock) throws IOException { - List blockPositions = new ArrayList<>(); - - Roaring64NavigableMap positions = logBlock.getRecordPositions(); - if (positions == null || positions.isEmpty()) { - throw new HoodieValidationException("No record position info is found when attempt to do position based merge."); - } - - Iterator iterator = positions.iterator(); - while (iterator.hasNext()) { - blockPositions.add(iterator.next()); - } - - if (blockPositions.isEmpty()) { - throw new HoodieCorruptedDataException("No positions are extracted."); - } - - return blockPositions; - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java index a780a0d4f774..80af415bae38 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java @@ -127,12 +127,11 @@ public HoodieFileGroupReader(HoodieReaderContext readerContext, * Initialize internal iterators on the base and log files. */ public void initRecordIterators() throws IOException { - ClosableIterator iter = makeBaseFileIterator(); if (logFiles.isEmpty()) { - this.baseFileIterator = CachingIterator.wrap(iter, readerContext); + this.baseFileIterator = CachingIterator.wrap(makeBaseFileIterator(), readerContext); } else { - this.baseFileIterator = iter; scanLogFiles(); + this.baseFileIterator = makeBaseFileIterator(); recordBuffer.setBaseFileIterator(baseFileIterator); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java index 689e43ce1d83..055bdab1cf55 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java @@ -103,7 +103,7 @@ public void processNextDataRecord(T record, Map metadata, Serial } @Override - public void processDeleteBlock(HoodieDeleteBlock deleteBlock) { + public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws IOException { Iterator it = Arrays.stream(deleteBlock.getRecordsToDelete()).iterator(); while (it.hasNext()) { DeleteRecord record = it.next(); @@ -114,12 +114,8 @@ public void processDeleteBlock(HoodieDeleteBlock deleteBlock) { @Override public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable recordKey) { - Pair, Map> existingRecordMetadataPair = records.get(recordKey); - Option recordOpt = doProcessNextDeletedRecord(deleteRecord, existingRecordMetadataPair); - if (recordOpt.isPresent()) { - records.put(recordKey, Pair.of(Option.empty(), readerContext.generateMetadataForRecord( - (String) recordKey, recordOpt.get().getPartitionPath(), recordOpt.get().getOrderingValue()))); - } + records.put(recordKey, Pair.of(Option.empty(), readerContext.generateMetadataForRecord( + (String) recordKey, deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue()))); } @Override @@ -134,17 +130,7 @@ protected boolean doHasNext() throws IOException { // Handle merging. while (baseFileIterator.hasNext()) { T baseRecord = baseFileIterator.next(); - - String recordKey = readerContext.getRecordKey(baseRecord, readerSchema); - Pair, Map> logRecordInfo = records.remove(recordKey); - Map metadata = readerContext.generateMetadataForRecord( - baseRecord, readerSchema); - - Option resultRecord = logRecordInfo != null - ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(), logRecordInfo.getRight()) - : merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord), metadata); - if (resultRecord.isPresent()) { - nextRecord = readerContext.seal(resultRecord.get()); + if (doHasNextMerge(baseRecord)) { return true; } } @@ -166,4 +152,20 @@ protected boolean doHasNext() throws IOException { } return false; } + + protected boolean doHasNextMerge(T baseRecord) throws IOException { + String recordKey = readerContext.getRecordKey(baseRecord, readerSchema); + Pair, Map> logRecordInfo = records.remove(recordKey); + Map metadata = readerContext.generateMetadataForRecord( + baseRecord, readerSchema); + + Option resultRecord = logRecordInfo != null + ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(), logRecordInfo.getRight()) + : merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord), metadata); + if (resultRecord.isPresent()) { + nextRecord = readerContext.seal(resultRecord.get()); + return true; + } + return false; + } } \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java index 04c30f4f4296..778ae09f6690 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java @@ -21,23 +21,26 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieReaderContext; -import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.KeySpec; import org.apache.hudi.common.table.log.block.HoodieDataBlock; import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieKeyException; import org.apache.avro.Schema; +import org.roaringbitmap.longlong.Roaring64NavigableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -46,17 +49,22 @@ import java.util.Set; import java.util.function.Function; -import static org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID; +import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY; + /** * A buffer that is used to store log records by {@link org.apache.hudi.common.table.log.HoodieMergedLogRecordReader} * by calling the {@link #processDataBlock} and {@link #processDeleteBlock} methods into record position based map. * Here the position means that record position in the base file. The records from the base file is accessed from an iterator object. These records are merged when the * {@link #hasNext} method is called. */ -public class HoodiePositionBasedFileGroupRecordBuffer extends HoodieBaseFileGroupRecordBuffer { +public class HoodiePositionBasedFileGroupRecordBuffer extends HoodieKeyBasedFileGroupRecordBuffer { + + private static final Logger LOG = LoggerFactory.getLogger(HoodiePositionBasedFileGroupRecordBuffer.class); + public static final String ROW_INDEX_COLUMN_NAME = "row_index"; public static final String ROW_INDEX_TEMPORARY_COLUMN_NAME = "_tmp_metadata_" + ROW_INDEX_COLUMN_NAME; private long nextRecordPosition = 0L; + private boolean needToDoHybridStrategy = false; public HoodiePositionBasedFileGroupRecordBuffer(HoodieReaderContext readerContext, HoodieTableMetaClient hoodieTableMetaClient, @@ -74,11 +82,24 @@ public HoodiePositionBasedFileGroupRecordBuffer(HoodieReaderContext readerCon @Override public BufferType getBufferType() { - return BufferType.POSITION_BASED_MERGE; + return readerContext.getUseRecordPosition() ? BufferType.POSITION_BASED_MERGE : super.getBufferType(); } @Override public void processDataBlock(HoodieDataBlock dataBlock, Option keySpecOpt) throws IOException { + if (!readerContext.getUseRecordPosition()) { + super.processDataBlock(dataBlock, keySpecOpt); + return; + } + // Extract positions from data block. + List recordPositions = extractRecordPositions(dataBlock); + if (recordPositions == null) { + LOG.warn("Falling back to key based merge for Read"); + fallbackToKeyBasedBuffer(); + super.processDataBlock(dataBlock, keySpecOpt); + return; + } + // Prepare key filters. Set keys = new HashSet<>(); boolean isFullKey = true; @@ -95,9 +116,6 @@ public void processDataBlock(HoodieDataBlock dataBlock, Option keySpecO enablePartialMerging = true; } - // Extract positions from data block. - List recordPositions = extractRecordPositions(dataBlock); - Option, Schema>> schemaEvolutionTransformerOpt = composeEvolvedSchemaTransformer(dataBlock); @@ -135,46 +153,43 @@ public void processDataBlock(HoodieDataBlock dataBlock, Option keySpecO } } - @Override - public void processNextDataRecord(T record, Map metadata, Serializable recordPosition) throws IOException { - Pair, Map> existingRecordMetadataPair = records.get(recordPosition); - Option>> mergedRecordAndMetadata = - doProcessNextDataRecord(record, metadata, existingRecordMetadataPair); - if (mergedRecordAndMetadata.isPresent()) { - records.put(recordPosition, Pair.of( - Option.ofNullable(readerContext.seal(mergedRecordAndMetadata.get().getLeft())), - mergedRecordAndMetadata.get().getRight())); + private void fallbackToKeyBasedBuffer() { + readerContext.setUseRecordPosition(false); + //need to make a copy of the keys to avoid concurrent modification exception + ArrayList positions = new ArrayList<>(records.keySet()); + for (Serializable position : positions) { + Pair, Map> entry = records.get(position); + Object recordKey = entry.getRight().get(INTERNAL_META_RECORD_KEY); + if (entry.getLeft().isPresent() || recordKey != null) { + + records.put((String) recordKey, entry); + records.remove(position); + } else { + //if it's a delete record and the key is null, then we need to still use + //positions + needToDoHybridStrategy = true; + } } } @Override public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws IOException { - List recordPositions = extractRecordPositions(deleteBlock); - if (recordMerger.getMergingStrategy().equals(DEFAULT_MERGER_STRATEGY_UUID)) { - for (Long recordPosition : recordPositions) { - records.put(recordPosition, - Pair.of(Option.empty(), readerContext.generateMetadataForRecord(null, "", 0))); - } + if (!readerContext.getUseRecordPosition()) { + super.processDeleteBlock(deleteBlock); return; } - int recordIndex = 0; - Iterator it = Arrays.stream(deleteBlock.getRecordsToDelete()).iterator(); - while (it.hasNext()) { - DeleteRecord record = it.next(); - long recordPosition = recordPositions.get(recordIndex++); - processNextDeletedRecord(record, recordPosition); + List recordPositions = extractRecordPositions(deleteBlock); + if (recordPositions == null) { + LOG.warn("Falling back to key based merge for Read"); + fallbackToKeyBasedBuffer(); + super.processDeleteBlock(deleteBlock); + return; } - } - @Override - public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable recordPosition) { - Pair, Map> existingRecordMetadataPair = records.get(recordPosition); - Option recordOpt = doProcessNextDeletedRecord(deleteRecord, existingRecordMetadataPair); - if (recordOpt.isPresent()) { - String recordKey = recordOpt.get().getRecordKey(); - records.put(recordPosition, Pair.of(Option.empty(), readerContext.generateMetadataForRecord( - recordKey, recordOpt.get().getPartitionPath(), recordOpt.get().getOrderingValue()))); + for (Long recordPosition : recordPositions) { + records.put(recordPosition, + Pair.of(Option.empty(), readerContext.generateMetadataForRecord(null, "", 0))); } } @@ -186,43 +201,97 @@ public boolean containsLogRecord(String recordKey) { } @Override - protected boolean doHasNext() throws IOException { - ValidationUtils.checkState(baseFileIterator != null, "Base file iterator has not been set yet"); - - // Handle merging. - while (baseFileIterator.hasNext()) { - T baseRecord = baseFileIterator.next(); - nextRecordPosition = readerContext.extractRecordPosition(baseRecord, readerSchema, ROW_INDEX_COLUMN_NAME, nextRecordPosition); - Pair, Map> logRecordInfo = records.remove(nextRecordPosition++); - - Map metadata = readerContext.generateMetadataForRecord( - baseRecord, readerSchema); - - Option resultRecord = logRecordInfo != null - ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(), logRecordInfo.getRight()) - : merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord), metadata); - if (resultRecord.isPresent()) { - nextRecord = readerContext.seal(resultRecord.get()); - return true; - } + protected boolean doHasNextMerge(T baseRecord) throws IOException { + if (!readerContext.getUseRecordPosition()) { + return doHasNextFallbackMerge(baseRecord); } - // Handle records solely from log files. - if (logRecordIterator == null) { - logRecordIterator = records.values().iterator(); + nextRecordPosition = readerContext.extractRecordPosition(baseRecord, readerSchema, + ROW_INDEX_COLUMN_NAME, nextRecordPosition); + Pair, Map> logRecordInfo = records.remove(nextRecordPosition++); + + Map metadata = readerContext.generateMetadataForRecord( + baseRecord, readerSchema); + + Option resultRecord = logRecordInfo != null + ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(), logRecordInfo.getRight()) + : merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord), metadata); + if (resultRecord.isPresent()) { + nextRecord = readerContext.seal(resultRecord.get()); + return true; } + return false; + } - while (logRecordIterator.hasNext()) { - Pair, Map> nextRecordInfo = logRecordIterator.next(); - Option resultRecord; - resultRecord = merge(Option.empty(), Collections.emptyMap(), - nextRecordInfo.getLeft(), nextRecordInfo.getRight()); - if (resultRecord.isPresent()) { - nextRecord = readerContext.seal(resultRecord.get()); - return true; + private boolean doHasNextFallbackMerge(T baseRecord) throws IOException { + if (needToDoHybridStrategy) { + //see if there is a delete block with record positions + nextRecordPosition = readerContext.extractRecordPosition(baseRecord, readerSchema, + ROW_INDEX_COLUMN_NAME, nextRecordPosition); + Pair, Map> logRecordInfo = records.remove(nextRecordPosition++); + if (logRecordInfo != null) { + //we have a delete that was not able to be converted. Since it is the newest version, the record is deleted + //remove a key based record if it exists + records.remove(readerContext.getRecordKey(baseRecord, readerSchema)); + return false; } } + return super.doHasNextMerge(baseRecord); + } - return false; + /** + * Filter a record for downstream processing when: + * 1. A set of pre-specified keys exists. + * 2. The key of the record is not contained in the set. + */ + protected boolean shouldSkip(T record, String keyFieldName, boolean isFullKey, Set keys, Schema dataBlockSchema) { + String recordKey = readerContext.getValue(record, dataBlockSchema, keyFieldName).toString(); + // Can not extract the record key, throw. + if (recordKey == null || recordKey.isEmpty()) { + throw new HoodieKeyException("Can not extract the key for a record"); + } + + // No keys are specified. Cannot skip at all. + if (keys.isEmpty()) { + return false; + } + + // When the record key matches with one of the keys or key prefixes, can not skip. + if ((isFullKey && keys.contains(recordKey)) + || (!isFullKey && keys.stream().anyMatch(recordKey::startsWith))) { + return false; + } + + // Otherwise, this record is not needed. + return true; + } + + /** + * Extract the record positions from a log block header. + * + * @param logBlock + * @return + * @throws IOException + */ + protected static List extractRecordPositions(HoodieLogBlock logBlock) throws IOException { + List blockPositions = new ArrayList<>(); + + Roaring64NavigableMap positions = logBlock.getRecordPositions(); + if (positions == null || positions.isEmpty()) { + LOG.warn("No record position info is found when attempt to do position based merge."); + return null; + } + + Iterator iterator = positions.iterator(); + while (iterator.hasNext()) { + blockPositions.add(iterator.next()); + } + + if (blockPositions.isEmpty()) { + LOG.warn("No positions are extracted."); + return null; + } + + return blockPositions; } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala index 2cdf50909aaa..5e0288c2e2da 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet +import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile @@ -25,6 +26,7 @@ import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, HoodieCDCFileGrou import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig, TypedProperties} import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.table.read.HoodieFileGroupReader import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.FileIOUtils @@ -151,30 +153,8 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState, val hoodieBaseFile = fileSlice.getBaseFile.get() baseFileReader(createPartitionedFile(fileSliceMapping.getPartitionValues, hoodieBaseFile.getStoragePath, 0, hoodieBaseFile.getFileLen)) } else { - val readerContext = new SparkFileFormatInternalRowReaderContext(parquetFileReader.value, tableState.recordKeyField, filters) - val storageConf = broadcastedStorageConf.value - val metaClient: HoodieTableMetaClient = HoodieTableMetaClient - .builder().setConf(storageConf).setBasePath(tableState.tablePath).build - val reader = new HoodieFileGroupReader[InternalRow]( - readerContext, - storageConf, - tableState.tablePath, - tableState.latestCommitTimestamp.get, - fileSlice, - broadcastedDataSchema.value, - broadcastedRequestedSchema.value, - internalSchemaOpt, - metaClient, - metaClient.getTableConfig.getProps, - metaClient.getTableConfig, - file.start, - file.length, - shouldUseRecordPosition, - options.getOrElse(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue() + "").toLong, - options.getOrElse(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), FileIOUtils.getDefaultSpillableMapBasePath), - DiskMapType.valueOf(options.getOrElse(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT)), - options.getOrElse(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().toString).toBoolean) - reader.initRecordIterators() + val reader = buildAndInitReader(parquetFileReader.value, filters, broadcastedStorageConf.value, + fileSlice, broadcastedDataSchema.value, broadcastedRequestedSchema.value, file, options) // Append partition values to rows and project to output schema appendPartitionAndProject( reader.getClosableIterator, @@ -229,6 +209,40 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState, props) } + protected def buildAndInitReader(parquetFileReader: SparkParquetReader, + filters: Seq[Filter], + storageConf: StorageConfiguration[_], + fileSlice: FileSlice, + dataAvroSchema: Schema, + requestedAvroSchema: Schema, + file: PartitionedFile, + options: Map[String, String]): HoodieFileGroupReader[InternalRow] = { + val readerContext = new SparkFileFormatInternalRowReaderContext(parquetFileReader, tableState.recordKeyField, filters) + val metaClient: HoodieTableMetaClient = HoodieTableMetaClient + .builder().setConf(storageConf).setBasePath(tableState.tablePath).build + val reader = new HoodieFileGroupReader[InternalRow]( + readerContext, + storageConf, + tableState.tablePath, + tableState.latestCommitTimestamp.get, + fileSlice, + dataAvroSchema, + requestedAvroSchema, + internalSchemaOpt, + metaClient, + metaClient.getTableConfig.getProps, + metaClient.getTableConfig, + file.start, + file.length, + shouldUseRecordPosition, + options.getOrElse(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue() + "").toLong, + options.getOrElse(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), FileIOUtils.getDefaultSpillableMapBasePath), + DiskMapType.valueOf(options.getOrElse(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT)), + options.getOrElse(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().toString).toBoolean) + reader.initRecordIterators() + reader + } + private def appendPartitionAndProject(iter: HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow], inputSchema: StructType, partitionSchema: StructType, diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java index c5c9ba775c99..cdab0d06ec1f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java @@ -39,7 +39,6 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.avro.Schema; @@ -61,10 +60,7 @@ import static org.apache.hudi.common.testutils.HoodieTestUtils.createMetaClient; import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodiePositionBasedFileGroupRecordBuffer extends TestHoodieFileGroupReaderOnSpark { private final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF); @@ -101,6 +97,7 @@ public void prepareBuffer(boolean useCustomMerger) throws Exception { ? Option.empty() : Option.of(partitionPaths[0]); HoodieReaderContext ctx = getHoodieReaderContext(getBasePath(), avroSchema, getStorageConf()); + ctx.setUseRecordPosition(true); ctx.setHasBootstrapBaseFile(false); ctx.setHasLogFiles(true); ctx.setNeedsBootstrapMerge(false); @@ -169,22 +166,12 @@ public void testProcessDeleteBlockWithPositions() throws Exception { assertNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY)); } - @Test - public void testProcessDeleteBlockWithCustomMerger() throws Exception { - prepareBuffer(true); - HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions(); - buffer.processDeleteBlock(deleteBlock); - assertEquals(50, buffer.getLogRecords().size()); - assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY)); - } - @Test public void testProcessDeleteBlockWithoutPositions() throws Exception { prepareBuffer(false); HoodieDeleteBlock deleteBlock = getDeleteBlockWithoutPositions(); - Exception exception = assertThrows( - HoodieValidationException.class, () -> buffer.processDeleteBlock(deleteBlock)); - assertTrue(exception.getMessage().contains("No record position info is found")); + buffer.processDeleteBlock(deleteBlock); + assertEquals(50, buffer.getLogRecords().size()); } public static class CustomMerger implements HoodieRecordMerger { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPositionBasedMergingFallback.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPositionBasedMergingFallback.scala new file mode 100644 index 000000000000..2e3a3995ec4b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPositionBasedMergingFallback.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.FileSystem +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE} +import org.apache.hudi.HoodieConversionUtils.toJavaOption +import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig} +import org.apache.hudi.common.model.HoodieRecordMerger +import org.apache.hudi.common.util +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.hudi.util.JFunction +import org.apache.spark.sql.SaveMode.{Append, Overwrite} +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{Arguments, MethodSource} + +import java.util.function.Consumer + +class TestPositionBasedMergingFallback extends HoodieSparkClientTestBase { + var spark: SparkSession = null + + override def getSparkSessionExtensionsInjector: util.Option[Consumer[SparkSessionExtensions]] = + toJavaOption( + Some( + JFunction.toJavaConsumer((receiver: SparkSessionExtensions) => new HoodieSparkSessionExtension().apply(receiver))) + ) + + @BeforeEach override def setUp(): Unit = { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initHoodieStorage() + } + + @AfterEach override def tearDown(): Unit = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + FileSystem.closeAll() + System.gc() + } + + @ParameterizedTest + @MethodSource(Array("testArgs")) + def testPositionFallback(updateWithRecordPositions: String, deleteWithRecordPositions: String, secondUpdateWithPositions: String): Unit = { + val columns = Seq("ts", "key", "name", "_hoodie_is_deleted") + val data = Seq( + (10, "1", "A", false), + (10, "2", "B", false), + (10, "3", "C", false), + (10, "4", "D", false), + (10, "5", "E", false)) + + val inserts = spark.createDataFrame(data).toDF(columns: _*) + inserts.write.format("hudi"). + option(RECORDKEY_FIELD.key(), "key"). + option(PRECOMBINE_FIELD.key(), "ts"). + option("hoodie.table.name", "test_table"). + option(TABLE_TYPE.key(), "MERGE_ON_READ"). + option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). + option(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key(), HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID). + option(DataSourceWriteOptions.RECORD_MERGER_IMPLS.key(), "org.apache.hudi.HoodieSparkRecordMerger"). + option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). + option(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), "true"). + mode(Overwrite). + save(basePath) + + val updateData = Seq((11, "1", "A_1", false), (9, "2", "B_1", false)) + + val updates = spark.createDataFrame(updateData).toDF(columns: _*) + + updates.write.format("hudi"). + option(RECORDKEY_FIELD.key(), "key"). + option(PRECOMBINE_FIELD.key(), "ts"). + option("hoodie.table.name", "test_table"). + option(TABLE_TYPE.key(), "MERGE_ON_READ"). + option(OPERATION.key(), "upsert"). + option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). + option(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key(), HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID). + option(DataSourceWriteOptions.RECORD_MERGER_IMPLS.key(), "org.apache.hudi.HoodieSparkRecordMerger"). + option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). + option(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), updateWithRecordPositions). + mode(Append). + save(basePath) + + val deletesData = Seq((10, "4", "D", true), (10, "3", "C", true)) + + val deletes = spark.createDataFrame(deletesData).toDF(columns: _*) + deletes.write.format("hudi"). + option(RECORDKEY_FIELD.key(), "key"). + option(PRECOMBINE_FIELD.key(), "ts"). + option("hoodie.table.name", "test_table"). + option(TABLE_TYPE.key(), "MERGE_ON_READ"). + option(OPERATION.key(), "upsert"). + option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). + option(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key(), HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID). + option(DataSourceWriteOptions.RECORD_MERGER_IMPLS.key(), "org.apache.hudi.HoodieSparkRecordMerger"). + option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). + option(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), deleteWithRecordPositions). + mode(Append). + save(basePath) + + + val secondUpdateData = Seq((14, "5", "E_3", false), (3, "3", "C_3", false)) + val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*) + secondUpdates.write.format("hudi"). + option(RECORDKEY_FIELD.key(), "key"). + option(PRECOMBINE_FIELD.key(), "ts"). + option("hoodie.table.name", "test_table"). + option(TABLE_TYPE.key(), "MERGE_ON_READ"). + option(OPERATION.key(), "upsert"). + option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"). + option(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key(), HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID). + option(DataSourceWriteOptions.RECORD_MERGER_IMPLS.key(), "org.apache.hudi.HoodieSparkRecordMerger"). + option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). + option(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), secondUpdateWithPositions). + mode(Append). + save(basePath) + + val df = spark.read.format("hudi"). + option(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key(), HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID). + option(DataSourceWriteOptions.RECORD_MERGER_IMPLS.key(), "org.apache.hudi.HoodieSparkRecordMerger"). + option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true"). + option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), "true").load(basePath) + val finalDf = df.select("ts", "key", "name") + val finalColumns = Seq("ts", "key", "name") + + val finalExpectedData = Seq( + (11, "1", "A_1"), + (10, "2", "B"), + (14, "5", "E_3")) + + val expectedDf = spark.createDataFrame(finalExpectedData).toDF(finalColumns: _*) + + assertEquals(0, finalDf.except(expectedDf).count()) + assertEquals(0, expectedDf.except(finalDf).count()) + + //test filter pushdown + //if the filter is pushed down, then record 2 will be filtered from the base file + //but record 2 in the log file won't be. Since precombine is larger in the base file, + //that record is the "winner" of merging, and therefore the record should not be + //in the output + sqlContext.clearCache() + val finalFilterDf = finalDf.filter("name != 'B'") + val finalExpectedFilterData = Seq( + (11, "1", "A_1"), + (14, "5", "E_3")) + + val expectedFilterDf = spark.createDataFrame(finalExpectedFilterData).toDF(finalColumns: _*) + assertEquals(0, finalFilterDf.except(expectedFilterDf).count()) + assertEquals(0, expectedFilterDf.except(finalFilterDf).count()) + } +} + +object TestPositionBasedMergingFallback { + def testArgs: java.util.stream.Stream[Arguments] = { + val scenarios = Array( + Seq("true","true","true"), + Seq("false","true","true"), + Seq("true","false","true"), + Seq("false","false","true"), + Seq("true","true","false"), + Seq("false","true","false"), + Seq("true","false","false"), + Seq("false","false","false") + ) + java.util.Arrays.stream(scenarios.map(as => Arguments.arguments(as.map(_.asInstanceOf[AnyRef]):_*))) + } +}