Skip to content

Commit

Permalink
HCD-74: Fix CorruptSSTableException in UCS with compression (#1602)
Browse files Browse the repository at this point in the history
For compressed sstables, fix the partition ending position calculation to prevent
going out of bounds when the partition end falls on a chunk boundary.
  • Loading branch information
szymon-miezal authored Feb 26, 2025
1 parent 5ed91d7 commit 303c520
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1557,7 +1557,14 @@ public long onDiskSizeForPartitionPositions(Collection<PartitionPositionBounds>
long lastEnd = 0;
for (PartitionPositionBounds position : positionBounds)
{
long upperChunkEnd = compressionMetadata.chunkFor(position.upperPosition).chunkEnd();
assert position.lowerPosition >= 0 : "the partition lower cannot be negative";
if (position.upperPosition == position.lowerPosition)
{
continue;
}
assert position.upperPosition >= position.lowerPosition : "the partition upper position cannot be lower than lower position";

long upperChunkEnd = compressionMetadata.chunkFor(position.upperPosition - 1).chunkEnd();
long lowerChunkStart = compressionMetadata.chunkFor(position.lowerPosition).offset;
if (lowerChunkStart < lastEnd) // if regions include the same chunk, count it only once
lowerChunkStart = lastEnd;
Expand Down
62 changes: 59 additions & 3 deletions test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReader.PartitionPositionBounds;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
Expand All @@ -102,6 +103,8 @@
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
import static org.apache.cassandra.io.sstable.format.SSTableReader.selectOnlyBigTableReaders;
import static org.apache.cassandra.schema.CompressionParams.DEFAULT_CHUNK_LENGTH;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -195,7 +198,7 @@ public void testGetPositionsForRanges()
// confirm that positions increase continuously
SSTableReader sstable = store.getLiveSSTables().iterator().next();
long previous = -1;
for (SSTableReader.PartitionPositionBounds section : sstable.getPositionsForRanges(ranges))
for (PartitionPositionBounds section : sstable.getPositionsForRanges(ranges))
{
assert previous <= section.lowerPosition : previous + " ! < " + section.lowerPosition;
assert section.lowerPosition < section.upperPosition : section.lowerPosition + " ! < " + section.upperPosition;
Expand Down Expand Up @@ -400,6 +403,59 @@ public void testOnDiskSizeForRanges()
}
}

@Test
public void testAssertionsOnDiskSizeForPartitionPositions()
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_COMPRESSED);
SSTableReader sstable = getNewSSTable(cfs);
partitioner = sstable.getPartitioner();

assertThatThrownBy(() -> sstable.onDiskSizeForPartitionPositions(Collections.singleton(new PartitionPositionBounds(-1, 0))))
.isInstanceOf(AssertionError.class);
assertThatThrownBy(() -> sstable.onDiskSizeForPartitionPositions(Collections.singleton(new PartitionPositionBounds(2, 1))))
.isInstanceOf(AssertionError.class);
}

@Test
public void testDiskSizeForEmptyPosition()
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_COMPRESSED);
SSTableReader sstable = getNewSSTable(cfs);
partitioner = sstable.getPartitioner();

long size = sstable.onDiskSizeForPartitionPositions(Collections.singleton(new PartitionPositionBounds(0, 0)));
assertEquals(0, size);
}

@Test
public void testDoNotFailOnChunkEndingPosition()
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_COMPRESSED);

// we want the last row to align to the end of chunk
int rowCount = DEFAULT_CHUNK_LENGTH;

// insert data and compact to a single sstable
for (int j = 0; j < rowCount; j++)
{
new RowUpdateBuilder(cfs.metadata(), 15000, k0(j))
.clustering("0")
.add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
.build()
.applyUnsafe();
}
cfs.forceBlockingFlush(UNIT_TESTS);
cfs.forceMajorCompaction();
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
partitioner = sstable.getPartitioner();

long totalDiskSizeForTheWholeRange = onDiskSizeForRanges(sstable, Collections.singleton(new Range<>(t(cut(k0(0), 1)), t0(rowCount))));
assertEquals(sstable.onDiskLength(), totalDiskSizeForTheWholeRange);
}

long onDiskSizeForRanges(SSTableReader sstable, Collection<Range<Token>> ranges)
{
return sstable.onDiskSizeForPartitionPositions(sstable.getPositionsForRanges(ranges));
Expand Down Expand Up @@ -555,7 +611,7 @@ public void testGetPositionsForRangesWithKeyCache()
long p6 = sstable.getPosition(k(6), SSTableReader.Operator.EQ).position;
long p7 = sstable.getPosition(k(7), SSTableReader.Operator.EQ).position;

SSTableReader.PartitionPositionBounds p = sstable.getPositionsForRanges(makeRanges(t(2), t(6))).get(0);
PartitionPositionBounds p = sstable.getPositionsForRanges(makeRanges(t(2), t(6))).get(0);

// range are start exclusive so we should start at 3
assert p.lowerPosition == p3;
Expand Down Expand Up @@ -876,7 +932,7 @@ public void testGetPositionsForRangesFromTableOpenedForBulkLoading()
ranges.add(new Range<Token>(t(98), t(99)));

SSTableReader sstable = store.getLiveSSTables().iterator().next();
List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(ranges);
List<PartitionPositionBounds> sections = sstable.getPositionsForRanges(ranges);
assert sections.size() == 1 : "Expected to find range in sstable" ;

// re-open the same sstable as it would be during bulk loading
Expand Down

0 comments on commit 303c520

Please sign in to comment.