Skip to content

Commit

Permalink
add iceberg stats when reading snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
the-other-tim-brown committed Jan 7, 2025
1 parent 8c143a7 commit 4db9bb8
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ public InternalSnapshot getCurrentSnapshot() {
Snapshot currentSnapshot = iceTable.currentSnapshot();
InternalTable irTable = getTable(currentSnapshot);

TableScan scan = iceTable.newScan().useSnapshot(currentSnapshot.snapshotId());
TableScan scan =
iceTable.newScan().useSnapshot(currentSnapshot.snapshotId()).includeColumnStats();
PartitionSpec partitionSpec = iceTable.spec();
List<PartitionFileGroup> partitionedDataFiles;
try (CloseableIterable<FileScanTask> files = scan.planFiles()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ public void testGetCurrentSnapshot(@TempDir Path workingDir) throws IOException
PartitionValue partitionEntry = partitionValues.iterator().next();
assertEquals(
"cs_sold_date_sk", partitionEntry.getPartitionField().getSourceField().getName());
// TODO generate test with column stats
assertEquals(0, internalDataFile.getColumnStats().size());
assertEquals(7, internalDataFile.getColumnStats().size());
}
}

Expand Down Expand Up @@ -202,12 +201,12 @@ public void testGetTableChangeForCommit(@TempDir Path workingDir) throws IOExcep
Snapshot snapshot5 = catalogSales.currentSnapshot();
Snapshot snapshot4 = catalogSales.snapshot(snapshot5.parentId());

validateTableChangeDiffSize(catalogSales, snapshot1, 5, 0);
validateTableChangeDiffSize(catalogSales, snapshot2, 0, 3);
validateTableChangeDiffSize(catalogSales, snapshot3, 5, 0);
validateTableChangeDiffSize(catalogSales, snapshot1, 5, 0, 7);
validateTableChangeDiffSize(catalogSales, snapshot2, 0, 3, 7);
validateTableChangeDiffSize(catalogSales, snapshot3, 5, 0, 7);
// transaction related snapshot verification
validateTableChangeDiffSize(catalogSales, snapshot4, 0, 1);
validateTableChangeDiffSize(catalogSales, snapshot5, 1, 0);
validateTableChangeDiffSize(catalogSales, snapshot4, 0, 1, 7);
validateTableChangeDiffSize(catalogSales, snapshot5, 1, 0, 7);

assertEquals(4, catalogSales.history().size());
catalogSales.expireSnapshots().expireSnapshotId(snapshot1.snapshotId()).commit();
Expand Down Expand Up @@ -242,7 +241,7 @@ public void testGetTableChangeForCommit(@TempDir Path workingDir) throws IOExcep
catalogSales.updateSpec().removeField("cs_sold_date_sk").commit();
Snapshot snapshot8 = catalogSales.currentSnapshot();

validateTableChangeDiffSize(catalogSales, snapshot7, 1, 2);
validateTableChangeDiffSize(catalogSales, snapshot7, 1, 2, 7);
assertEquals(snapshot7, snapshot8);
}

Expand Down Expand Up @@ -311,10 +310,13 @@ private static long getDataFileCount(Table catalogSales) throws IOException {
}

private void validateTableChangeDiffSize(
Table table, Snapshot snapshot, int addedFiles, int removedFiles) {
Table table, Snapshot snapshot, int addedFiles, int removedFiles, int numberOfColumns) {
IcebergConversionSource conversionSource = getIcebergConversionSource(table);
TableChange tableChange = conversionSource.getTableChangeForCommit(snapshot);
assertEquals(addedFiles, tableChange.getFilesDiff().getFilesAdded().size());
assertTrue(
tableChange.getFilesDiff().getFilesAdded().stream()
.allMatch(file -> file.getColumnStats().size() == numberOfColumns));
assertEquals(removedFiles, tableChange.getFilesDiff().getFilesRemoved().size());
}

Expand Down

0 comments on commit 4db9bb8

Please sign in to comment.