Skip to content

Commit

Permalink
test cow and mor, mor needs to add merger impls
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Vexler committed Oct 18, 2024
1 parent 6efda9c commit 08b6895
Showing 1 changed file with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -507,18 +507,25 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase {
assertEquals(25, metaClient.getActiveTimeline.countInstants())
}

@Test
def testStructuredStreamingWithMergeMode(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStructuredStreamingWithMergeMode(isCow: Boolean): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", "dest")
// First chunk of data
val records1 = recordsToStrings(dataGen.generateInserts("000", 10)).asScala.toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
val tableType = if (isCow) {
"COPY_ON_WRITE"
} else {
"MERGE_ON_READ"
}
val opts = commonOpts ++ Map(DataSourceWriteOptions.OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.TABLE_TYPE.key() -> tableType,
DataSourceWriteOptions.RECORD_MERGE_MODE.key() -> RecordMergeMode.CUSTOM.name(),
DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key() -> HoodieSparkDeleteRecordMerger.DELETE_MERGER_STRATEGY,
DataSourceWriteOptions.RECORD_MERGER_IMPLS.key() -> classOf[HoodieSparkDeleteRecordMerger].getName,
HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> "PARQUET")
HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> "parquet")
streamingWrite(inputDF1.schema, sourcePath, destPath, opts)


Expand All @@ -529,7 +536,9 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase {

//merger will delete any records in records2 so we remove those from the original batch using except
val expectedFinalRecords = inputDF1.select("_row_key", "partition_path").except(inputDF2.select("_row_key", "partition_path"))
val finalRecords = spark.read.format("hudi").load(destPath).select("_row_key", "partition_path")
val finalRecords = spark.read.format("hudi")
.option(DataSourceWriteOptions.RECORD_MERGER_IMPLS.key(), classOf[HoodieSparkDeleteRecordMerger].getName)
.load(destPath).select("_row_key", "partition_path")
assertEquals(expectedFinalRecords.count(), finalRecords.count())
assertEquals(0, expectedFinalRecords.except(finalRecords).count())
}
Expand Down

0 comments on commit 08b6895

Please sign in to comment.