Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Position merge compat #2

Open
wants to merge 18 commits into
base: add_schema_evolution_to_fg_reader
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.InternalSchemaCache;
Expand All @@ -37,24 +36,17 @@
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;

import org.apache.avro.Schema;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA;
Expand Down Expand Up @@ -327,58 +319,4 @@ protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
}
return Option.empty();
}

/**
* Filter a record for downstream processing when:
* 1. A set of pre-specified keys exists.
* 2. The key of the record is not contained in the set.
*/
protected boolean shouldSkip(T record, String keyFieldName, boolean isFullKey, Set<String> keys, Schema dataBlockSchema) {
String recordKey = readerContext.getValue(record, dataBlockSchema, keyFieldName).toString();
// Can not extract the record key, throw.
if (recordKey == null || recordKey.isEmpty()) {
throw new HoodieKeyException("Can not extract the key for a record");
}

// No keys are specified. Cannot skip at all.
if (keys.isEmpty()) {
return false;
}

// When the record key matches with one of the keys or key prefixes, can not skip.
if ((isFullKey && keys.contains(recordKey))
|| (!isFullKey && keys.stream().anyMatch(recordKey::startsWith))) {
return false;
}

// Otherwise, this record is not needed.
return true;
}

/**
* Extract the record positions from a log block header.
*
* @param logBlock
* @return
* @throws IOException
*/
protected static List<Long> extractRecordPositions(HoodieLogBlock logBlock) throws IOException {
List<Long> blockPositions = new ArrayList<>();

Roaring64NavigableMap positions = logBlock.getRecordPositions();
if (positions == null || positions.isEmpty()) {
throw new HoodieValidationException("No record position info is found when attempt to do position based merge.");
}

Iterator<Long> iterator = positions.iterator();
while (iterator.hasNext()) {
blockPositions.add(iterator.next());
}

if (blockPositions.isEmpty()) {
throw new HoodieCorruptedDataException("No positions are extracted.");
}

return blockPositions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,11 @@ public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
* Initialize internal iterators on the base and log files.
*/
public void initRecordIterators() throws IOException {
ClosableIterator<T> iter = makeBaseFileIterator();
if (logFiles.isEmpty()) {
this.baseFileIterator = CachingIterator.wrap(iter, readerContext);
this.baseFileIterator = CachingIterator.wrap(makeBaseFileIterator(), readerContext);
} else {
this.baseFileIterator = iter;
scanLogFiles();
this.baseFileIterator = makeBaseFileIterator();
recordBuffer.setBaseFileIterator(baseFileIterator);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void processNextDataRecord(T record, Map<String, Object> metadata, Serial
}

@Override
public void processDeleteBlock(HoodieDeleteBlock deleteBlock) {
public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws IOException {
Iterator<DeleteRecord> it = Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
while (it.hasNext()) {
DeleteRecord record = it.next();
Expand All @@ -114,12 +114,8 @@ public void processDeleteBlock(HoodieDeleteBlock deleteBlock) {

@Override
public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable recordKey) {
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = records.get(recordKey);
Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord, existingRecordMetadataPair);
if (recordOpt.isPresent()) {
records.put(recordKey, Pair.of(Option.empty(), readerContext.generateMetadataForRecord(
(String) recordKey, recordOpt.get().getPartitionPath(), recordOpt.get().getOrderingValue())));
}
records.put(recordKey, Pair.of(Option.empty(), readerContext.generateMetadataForRecord(
(String) recordKey, deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue())));
}

@Override
Expand All @@ -134,17 +130,7 @@ protected boolean doHasNext() throws IOException {
// Handle merging.
while (baseFileIterator.hasNext()) {
T baseRecord = baseFileIterator.next();

String recordKey = readerContext.getRecordKey(baseRecord, readerSchema);
Pair<Option<T>, Map<String, Object>> logRecordInfo = records.remove(recordKey);
Map<String, Object> metadata = readerContext.generateMetadataForRecord(
baseRecord, readerSchema);

Option<T> resultRecord = logRecordInfo != null
? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(), logRecordInfo.getRight())
: merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord), metadata);
if (resultRecord.isPresent()) {
nextRecord = readerContext.seal(resultRecord.get());
if (doHasNextMerge(baseRecord)) {
return true;
}
}
Expand All @@ -166,4 +152,20 @@ protected boolean doHasNext() throws IOException {
}
return false;
}

protected boolean doHasNextMerge(T baseRecord) throws IOException {
String recordKey = readerContext.getRecordKey(baseRecord, readerSchema);
Pair<Option<T>, Map<String, Object>> logRecordInfo = records.remove(recordKey);
Map<String, Object> metadata = readerContext.generateMetadataForRecord(
baseRecord, readerSchema);

Option<T> resultRecord = logRecordInfo != null
? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(), logRecordInfo.getRight())
: merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord), metadata);
if (resultRecord.isPresent()) {
nextRecord = readerContext.seal(resultRecord.get());
return true;
}
return false;
}
}
Loading