diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 62ae7886c573..bbdc3a3910ef 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4 + "modification": 3 } diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 19fcbb7d1ea0..f7a9e5c8533d 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -65,8 +65,6 @@ dependencies { testImplementation library.java.bigdataoss_gcsio testImplementation library.java.bigdataoss_gcs_connector testImplementation library.java.bigdataoss_util_hadoop - testImplementation "org.apache.parquet:parquet-avro:$parquet_version" - testImplementation "org.apache.parquet:parquet-common:$parquet_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation project(":sdks:java:extensions:google-cloud-platform-core") diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java index c9906618a64d..5784dfd79744 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java @@ -35,7 +35,6 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.Record; @@ -48,7 +47,6 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.Type; @@ -100,8 +98,6 @@ public boolean advance() throws IOException { // which are not null-safe. @SuppressWarnings("nullness") org.apache.iceberg.@NonNull Schema project = this.project; - @Nullable - String nameMapping = source.getTable().properties().get(TableProperties.DEFAULT_NAME_MAPPING); do { // If our current iterator is working... do that. @@ -133,52 +129,37 @@ public boolean advance() throws IOException { switch (file.format()) { case ORC: LOG.info("Preparing ORC input"); - ORC.ReadBuilder orcReader = + iterable = ORC.read(input) .split(fileTask.start(), fileTask.length()) .project(project) .createReaderFunc( fileSchema -> GenericOrcReader.buildReader(project, fileSchema, idToConstants)) - .filter(fileTask.residual()); - - if (nameMapping != null) { - orcReader.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - iterable = orcReader.build(); + .filter(fileTask.residual()) + .build(); break; case PARQUET: LOG.info("Preparing Parquet input."); - Parquet.ReadBuilder parquetReader = + iterable = Parquet.read(input) .split(fileTask.start(), fileTask.length()) .project(project) .createReaderFunc( fileSchema -> GenericParquetReaders.buildReader(project, fileSchema, idToConstants)) - .filter(fileTask.residual()); - - if (nameMapping != null) { - parquetReader.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - iterable = parquetReader.build(); + .filter(fileTask.residual()) + .build(); break; case AVRO: LOG.info("Preparing Avro input."); - Avro.ReadBuilder avroReader = + iterable = Avro.read(input) .split(fileTask.start(), fileTask.length()) .project(project) .createReaderFunc( - fileSchema -> DataReader.create(project, fileSchema, idToConstants)); - - if (nameMapping != null) { - avroReader.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - iterable = avroReader.build(); + fileSchema -> DataReader.create(project, fileSchema, idToConstants)) + .build(); break; default: throw new UnsupportedOperationException("Cannot read format: " + file.format()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index 1c3f9b53f31a..6ff3bdf6a4ff 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -21,8 +21,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; -import java.io.File; -import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -30,8 +28,6 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; @@ -40,31 +36,13 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogUtil; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Metrics; -import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.hadoop.HadoopInputFile; -import org.apache.iceberg.mapping.MappedField; -import org.apache.iceberg.mapping.MappedFields; -import org.apache.iceberg.mapping.NameMapping; -import org.apache.iceberg.mapping.NameMappingParser; -import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.types.Types; -import org.apache.parquet.avro.AvroParquetWriter; -import org.apache.parquet.hadoop.ParquetWriter; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -232,184 +210,4 @@ public void testIdentityColumnScan() throws Exception { testPipeline.run(); } - - @Test - public void testNameMappingScan() throws Exception { - org.apache.avro.Schema metadataSchema = - org.apache.avro.Schema.createRecord( - "metadata", - null, - null, - false, - ImmutableList.of( - new org.apache.avro.Schema.Field( - "source", - org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), - null, - null))); - - org.apache.avro.Schema avroSchema = - org.apache.avro.Schema.createRecord( - "test", - null, - null, - false, - ImmutableList.of( - new org.apache.avro.Schema.Field( - "data", - org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), - null, - null), - new org.apache.avro.Schema.Field( - "id", - org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), - null, - null), - new org.apache.avro.Schema.Field("metadata", metadataSchema, null, null))); - - List> recordData = - ImmutableList.>builder() - .add( - ImmutableMap.of( - "id", - 0L, - "data", - "clarification", - "metadata", - ImmutableMap.of("source", "systemA"))) - .add( - ImmutableMap.of( - "id", 1L, "data", "risky", "metadata", ImmutableMap.of("source", "systemB"))) - .add( - ImmutableMap.of( - "id", 2L, "data", "falafel", "metadata", ImmutableMap.of("source", "systemC"))) - .build(); - - List avroRecords = - recordData.stream() - .map(data -> avroGenericRecord(avroSchema, data)) - .collect(Collectors.toList()); - - Configuration hadoopConf = new Configuration(); - String path = createParquetFile(avroSchema, avroRecords); - HadoopInputFile inputFile = HadoopInputFile.fromLocation(path, hadoopConf); - - NameMapping defaultMapping = - NameMapping.of( - MappedField.of(1, "id"), - MappedField.of(2, "data"), - MappedField.of(3, "metadata", MappedFields.of(MappedField.of(4, "source")))); - ImmutableMap tableProperties = - ImmutableMap.builder() - .put(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(defaultMapping)) - .build(); - - TableIdentifier tableId = - TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); - Table simpleTable = - warehouse - .buildTable(tableId, TestFixtures.NESTED_SCHEMA) - .withProperties(tableProperties) - .withPartitionSpec(PartitionSpec.unpartitioned()) - .create(); - - MetricsConfig metricsConfig = MetricsConfig.forTable(simpleTable); - Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig); - DataFile dataFile = - DataFiles.builder(PartitionSpec.unpartitioned()) - .withFormat(FileFormat.PARQUET) - .withInputFile(inputFile) - .withMetrics(metrics) - .build(); - - final Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.NESTED_SCHEMA); - - simpleTable.newFastAppend().appendFile(dataFile).commit(); - - Map catalogProps = - ImmutableMap.builder() - .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .put("warehouse", warehouse.location) - .build(); - - IcebergCatalogConfig catalogConfig = - IcebergCatalogConfig.builder() - .setCatalogName("name") - .setCatalogProperties(catalogProps) - .build(); - - PCollection output = - testPipeline - .apply(IcebergIO.readRows(catalogConfig).from(tableId)) - .apply(ParDo.of(new PrintRow())) - .setCoder(RowCoder.of(beamSchema)); - - final Row[] expectedRows = - recordData.stream() - .map(data -> icebergGenericRecord(TestFixtures.NESTED_SCHEMA.asStruct(), data)) - .map(record -> IcebergUtils.icebergRecordToBeamRow(beamSchema, record)) - .toArray(Row[]::new); - - PAssert.that(output) - .satisfies( - (Iterable rows) -> { - assertThat(rows, containsInAnyOrder(expectedRows)); - return null; - }); - - testPipeline.run(); - } - - @SuppressWarnings("unchecked") - public static GenericRecord avroGenericRecord( - org.apache.avro.Schema schema, Map values) { - GenericRecord record = new GenericData.Record(schema); - for (org.apache.avro.Schema.Field field : schema.getFields()) { - Object rawValue = values.get(field.name()); - Object avroValue = - rawValue instanceof Map - ? avroGenericRecord(field.schema(), (Map) rawValue) - : rawValue; - record.put(field.name(), avroValue); - } - return record; - } - - @SuppressWarnings("unchecked") - public static Record icebergGenericRecord(Types.StructType type, Map values) { - org.apache.iceberg.data.GenericRecord record = - org.apache.iceberg.data.GenericRecord.create(type); - for (Types.NestedField field : type.fields()) { - Object rawValue = values.get(field.name()); - Object value = - rawValue instanceof Map - ? icebergGenericRecord(field.type().asStructType(), (Map) rawValue) - : rawValue; - record.setField(field.name(), value); - } - return record; - } - - public static String createParquetFile(org.apache.avro.Schema schema, List records) - throws IOException { - - File tempFile = createTempFile(); - Path file = new Path(tempFile.getPath()); - - AvroParquetWriter.Builder builder = AvroParquetWriter.builder(file); - ParquetWriter parquetWriter = builder.withSchema(schema).build(); - for (GenericRecord record : records) { - parquetWriter.write(record); - } - parquetWriter.close(); - - return tempFile.getPath(); - } - - private static File createTempFile() throws IOException { - File tempFile = File.createTempFile(ScanSourceTest.class.getSimpleName(), ".tmp"); - tempFile.deleteOnExit(); - boolean unused = tempFile.delete(); - return tempFile; - } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java index d7a6cd34838c..9352123b5c77 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java @@ -163,10 +163,6 @@ public Table createTable( return catalog.createTable(tableId, schema, partitionSpec); } - public Catalog.TableBuilder buildTable(TableIdentifier tableId, Schema schema) { - return catalog.buildTable(tableId, schema); - } - public Table loadTable(TableIdentifier tableId) { return catalog.loadTable(tableId); } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java index a3ab3c8b50d4..a2ca86d1b5a2 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java @@ -36,13 +36,6 @@ public class TestFixtures { new Schema( required(1, "id", Types.LongType.get()), optional(2, "data", Types.StringType.get())); - public static final Schema NESTED_SCHEMA = - new Schema( - required(1, "id", Types.LongType.get()), - optional(2, "data", Types.StringType.get()), - optional( - 3, "metadata", Types.StructType.of(optional(4, "source", Types.StringType.get())))); - public static final List> FILE1SNAPSHOT1_DATA = ImmutableList.of( ImmutableMap.of("id", 0L, "data", "clarification"),