Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task: Add test for simulating OOM error during compaction #3

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,143 @@ public void testEmptyTable() {

assertThat(table.currentSnapshot()).as("Table must stay empty").isNull();
}
@Test
public void testCompactionMemoryUsage() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
Table table = TABLES.create(SCHEMA, spec, options, tableLocation); // create empty table
int totalFiles = 0;

// Write records and perform compaction job
for (int i = 0; i < 10000; i++) {
// Append records to the table in each iteration
long memoryBefore = getMemoryUsage(); // Memory prior to compaction
writeRecords(10000, SCALE*3); // Appending large files in each iteration
totalFiles += 10000;
// Check if table has files
System.out.println(totalFiles);
// shouldHaveFiles(table, totalFiles);
try {
basicRewrite(table).execute(); // Try to execute with this many files
long memoryAfter = getMemoryUsage(); // Memory after compaction run
long memoryConsumed = memoryAfter - memoryBefore; // Amount of memory consumed for this operation
System.out.println("Memory used during compaction: " + memoryConsumed + " bytes");

} catch (OutOfMemoryError e) {
// Catch the OutOfMemoryError and print the most recent memory usage
System.out.println("Caught OutOfMemoryError as expected: " + e.getMessage());
long memoryAfter = getMemoryUsage(); // Memory after compaction run
long memoryConsumed = memoryAfter - memoryBefore; // Amount of memory consumed for this operation
System.out.println("Memory used during compaction: " + memoryConsumed + " bytes");
} catch (Exception e) {
// If any other exception occurs, print that the test failed
System.out.println("Caught: " + e.getMessage());
}
}
}

// Read and Compact Helper Method
private RewritePositionDeleteFilesSparkAction basicRewritePositionalDeletes (Table curTable) {
curTable.refresh(); // updates table snapshot to newest "version"
return actions().rewritePositionDeletes(curTable);
}

@Test
public void testCompactionSdataLdelete() {
PartitionSpec spec = PartitionSpec.unpartitioned(); // determines how we are partitioning
Map<String, String> options = Maps.newHashMap();
Table table = TABLES.create(SCHEMA, spec, options, tableLocation); // create empty table

// Create a large amount of records
List<GenericRecord> records = new Lists.newArrayList();
GenericRecord record = GenericRecord.create(table.schema());

for (int i = 1; i <= 100_000; i++) {
records.add(record.copy("id", i, "data", "data_" + i));
}

// Create a dataFile with all the records
this.dataFile = FileHelpers.writeDataFile(
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
Row.of(0),
records);
table.newAppend().appendFile(dataFile).commit(); // saving a snapshot of the table

// Create a lot of deletes
PositionDelete<Record> dataDelete = PositionDelete.create();
List<DeleteFile> dataDeletes = new Lists.newArrayList();
for (int i = 1; i <= 50_000; i++) {
dataDeletes.add(dataDelete.copy("data", "data_" + i));
}

// Write several delete files
RowDelta r = table.newRowDelta();
for (int i = 1; i <= 50_000; i++) {
// Create a huge equality delete with all the deletes
DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
ImmutableList.of(dataDeletes),
deleteRowSchema);
r.addDeletes(eqDeletes);
}
r.commit();

//Compaction Job
basicRewritePositionalDeltes(table).execute();
}

@Test
public void testCompactionLdataLdeletes() {
PartitionSpec spec = PartitionSpec.unpartitioned(); // determines how we are partitioning
Map<String, String> options = Maps.newHashMap();
Table table = TABLES.create(SCHEMA, spec, options, tableLocation); // create empty table

// Create a large amount of records
List<GenericRecord> records = new Lists.newArrayList();
GenericRecord record = GenericRecord.create(table.schema());

for (int i = 1; i <= 100_000; i++) {
records.add(record.copy("id", i, "data", "data_" + i));
}

// Create several dataFiles
AppendFiles recAppend = table.newAppend();
for (int i = 1; i <= 100_000; i++) {
// Create a dataFile with all the records
this.dataFile = FileHelpers.writeDataFile(
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
Row.of(0),
ImmutableList.of(records));
recAppennd.appendFile(dataFile);
}
recAppend.commit(); // saving a snapshot of the table

// Create a lot of deletes

List<DeleteFile> dataDeletes = new Lists.newArrayList();
for (int i = 1; i <= 50_000; i++) {
dataDeletes.add(dataDelete.copy("data", "data_" + i));
}

// Write several delete files
RowDelta r = table.newRowDelta();
for (int i = 1; i <= 50_000; i++) {
// Create a huge equality delete with all the deletes
DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0),
ImmutableList.of(dataDeletes),
deleteRowSchema);
r.addDeletes(eqDeletes);
}
r.commit();

// Compaction job
basicRewritePositionalDeltes(table).execute();
}

@Test
public void testBinPackUnpartitionedTable() {
Expand Down