Skip to content

Commit

Permalink
SimpleMappedReader no longer closes its ReaderSupplier
Browse files Browse the repository at this point in the history
  • Loading branch information
jbellis committed Jan 10, 2025
1 parent f9d0014 commit 9613109
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 109 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,12 @@ Compressing the vectors with product quantization is done as follows:

Then we can wire up the compressed vectors to a two-phase search by getting the fast ApproximateScoreFunction from PQVectors, and the Reranker from the index View:
```java
ReaderSupplier rs = ReaderSupplierFactor.open(indexPath);
ReaderSupplier rs = ReaderSupplierFactory.open(indexPath);
OnDiskGraphIndex index = OnDiskGraphIndex.load(rs);
// load the PQVectors that we just wrote to disk
try (RandomAccessReader in = new SimpleMappedReader(pqPath)) {
try (ReaderSupplier pqSupplier = ReaderSupplierFactory.open(pqPath);
RandomAccessReader in = pqSupplier.get())
{
PQVectors pqv = PQVectors.load(in);
// SearchScoreProvider that does a first pass with the loaded-in-memory PQVectors,
// then reranks with the exact vectors that are stored on disk in the index
Expand Down
8 changes: 4 additions & 4 deletions UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
performance.

## API changes
- MemorySegmentReader.Supplier must now be explicitly closed, instead of being
closed by the first Reader created from it.
- MemorySegmentReader.Supplier and SimpleMappedReader.Supplier must now be explicitly closed, instead of being
closed by the first Reader created from them.
- OnDiskGraphIndex no longer closes its ReaderSupplier

# Upgrading from 3.0.x to 3.0.6
### API changes in 3.0.6

## API changes
These were released in 3.0.6 but are spiritually part of 4.0.

- `VectorCompressor.encodeAll()` now returns a `CompressedVectors` object instead of a `ByteSequence<?>[]`.
This provides better encapsulation of the compression functionality while also allowing for more efficient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
public class SimpleMappedReader extends ByteBufferReader {
private static final Logger LOG = Logger.getLogger(SimpleMappedReader.class.getName());

private static final Unsafe unsafe = getUnsafe();

private static Unsafe getUnsafe() {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
Expand All @@ -47,60 +45,45 @@ private static Unsafe getUnsafe() {
}
}

public SimpleMappedReader(Path path) throws IOException {
this(path.toString());
}

public SimpleMappedReader(String name) throws IOException {
this(getMappedByteBuffer(name));
}

private SimpleMappedReader(MappedByteBuffer mbb) {
SimpleMappedReader(MappedByteBuffer mbb) {
super(mbb);
}

private static MappedByteBuffer getMappedByteBuffer(String name) throws IOException {
var raf = new RandomAccessFile(name, "r");
if (raf.length() > Integer.MAX_VALUE) {
throw new RuntimeException("MappedRandomAccessReader doesn't support large files");
}
MappedByteBuffer mbb = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, raf.length());
mbb.load();
raf.close();
return mbb;
}

@Override
public void close() {
if (unsafe != null) {
try {
unsafe.invokeCleaner(bb);
} catch (IllegalArgumentException e) {
// empty catch, this was a duplicated/indirect buffer or
// otherwise not cleanable
}
}
}

public SimpleMappedReader duplicate() {
return new SimpleMappedReader((MappedByteBuffer) bb.duplicate());
// Individual readers don't close anything
}

public static class Supplier implements ReaderSupplier {
private final SimpleMappedReader smr;
private final MappedByteBuffer buffer;
private static final Unsafe unsafe = getUnsafe();

public Supplier(Path path) throws IOException {
smr = new SimpleMappedReader(path);
try (var raf = new RandomAccessFile(path.toString(), "r")) {
if (raf.length() > Integer.MAX_VALUE) {
throw new RuntimeException("SimpleMappedReader doesn't support files above 2GB");
}
this.buffer = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, raf.length());
this.buffer.load();
}
}

@Override
public RandomAccessReader get() {
return smr.duplicate();
public SimpleMappedReader get() {
return new SimpleMappedReader((MappedByteBuffer) buffer.duplicate());
}

@Override
public void close() {
smr.close();
if (unsafe != null) {
try {
unsafe.invokeCleaner(buffer);
} catch (IllegalArgumentException e) {
// empty catch, this was a duplicated/indirect buffer or
// otherwise not cleanable
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
package io.github.jbellis.jvector.quantization;

import io.github.jbellis.jvector.vector.VectorizationProvider;
import java.util.concurrent.atomic.AtomicInteger;
import io.github.jbellis.jvector.vector.types.ByteSequence;
import io.github.jbellis.jvector.vector.types.VectorFloat;
import io.github.jbellis.jvector.vector.types.VectorTypeSupport;

import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Math.max;

/**
* A threadsafe mutable PQVectors implementation that grows dynamically as needed.
*/
public class MutablePQVectors extends PQVectors implements MutableCompressedVectors<VectorFloat<?>> {
private static final VectorTypeSupport vectorTypeSupport = VectorizationProvider.getInstance().getVectorTypeSupport();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.github.jbellis.jvector.disk.RandomAccessReader;
import io.github.jbellis.jvector.disk.ReaderSupplier;
import io.github.jbellis.jvector.disk.ReaderSupplierFactory;
import io.github.jbellis.jvector.disk.SimpleMappedReader;
import io.github.jbellis.jvector.example.util.SiftLoader;
import io.github.jbellis.jvector.graph.GraphIndex;
import io.github.jbellis.jvector.graph.GraphIndexBuilder;
Expand Down Expand Up @@ -158,11 +157,12 @@ public static void siftPersisted(List<VectorFloat<?>> baseVectors, List<VectorFl

// on-disk indexes require a ReaderSupplier (not just a Reader) because we will want it to
// open additional readers for searching
ReaderSupplier rs = ReaderSupplierFactory.open(indexPath);
OnDiskGraphIndex index = OnDiskGraphIndex.load(rs);
// measure our recall against the (exactly computed) ground truth
Function<VectorFloat<?>, SearchScoreProvider> sspFactory = q -> SearchScoreProvider.exact(q, VectorSimilarityFunction.EUCLIDEAN, ravv);
testRecall(index, queryVectors, groundTruth, sspFactory);
try (ReaderSupplier rs = ReaderSupplierFactory.open(indexPath)) {
OnDiskGraphIndex index = OnDiskGraphIndex.load(rs);
// measure our recall against the (exactly computed) ground truth
Function<VectorFloat<?>, SearchScoreProvider> sspFactory = q -> SearchScoreProvider.exact(q, VectorSimilarityFunction.EUCLIDEAN, ravv);
testRecall(index, queryVectors, groundTruth, sspFactory);
}
}

// diskann-style index with PQ
Expand Down Expand Up @@ -190,20 +190,23 @@ public static void siftDiskAnn(List<VectorFloat<?>> baseVectors, List<VectorFloa
pqv.write(out);
}

ReaderSupplier rs = ReaderSupplierFactory.open(indexPath);
OnDiskGraphIndex index = OnDiskGraphIndex.load(rs);
// load the PQVectors that we just wrote to disk
try (RandomAccessReader in = new SimpleMappedReader(pqPath)) {
PQVectors pqv = PQVectors.load(in);
// SearchScoreProvider that does a first pass with the loaded-in-memory PQVectors,
// then reranks with the exact vectors that are stored on disk in the index
Function<VectorFloat<?>, SearchScoreProvider> sspFactory = q -> {
ApproximateScoreFunction asf = pqv.precomputedScoreFunctionFor(q, VectorSimilarityFunction.EUCLIDEAN);
ExactScoreFunction reranker = index.getView().rerankerFor(q, VectorSimilarityFunction.EUCLIDEAN);
return new SearchScoreProvider(asf, reranker);
};
// measure our recall against the (exactly computed) ground truth
testRecall(index, queryVectors, groundTruth, sspFactory);
try (ReaderSupplier rs = ReaderSupplierFactory.open(indexPath)) {
OnDiskGraphIndex index = OnDiskGraphIndex.load(rs);
// load the PQVectors that we just wrote to disk
try (ReaderSupplier pqSupplier = ReaderSupplierFactory.open(pqPath);
RandomAccessReader in = pqSupplier.get())
{
PQVectors pqv = PQVectors.load(in);
// SearchScoreProvider that does a first pass with the loaded-in-memory PQVectors,
// then reranks with the exact vectors that are stored on disk in the index
Function<VectorFloat<?>, SearchScoreProvider> sspFactory = q -> {
ApproximateScoreFunction asf = pqv.precomputedScoreFunctionFor(q, VectorSimilarityFunction.EUCLIDEAN);
ExactScoreFunction reranker = index.getView().rerankerFor(q, VectorSimilarityFunction.EUCLIDEAN);
return new SearchScoreProvider(asf, reranker);
};
// measure our recall against the (exactly computed) ground truth
testRecall(index, queryVectors, groundTruth, sspFactory);
}
}
}

Expand Down Expand Up @@ -255,7 +258,9 @@ public static void siftDiskAnnLTM(List<VectorFloat<?>> baseVectors, List<VectorF
// searching the index does not change
ReaderSupplier rs = ReaderSupplierFactory.open(indexPath);
OnDiskGraphIndex index = OnDiskGraphIndex.load(rs);
try (RandomAccessReader in = new SimpleMappedReader(pqPath)) {
try (ReaderSupplier pqSupplier = ReaderSupplierFactory.open(pqPath);
RandomAccessReader in = pqSupplier.get())
{
var pqvSearch = PQVectors.load(in);
Function<VectorFloat<?>, SearchScoreProvider> sspFactory = q -> {
ApproximateScoreFunction asf = pqvSearch.precomputedScoreFunctionFor(q, VectorSimilarityFunction.EUCLIDEAN);
Expand Down Expand Up @@ -316,7 +321,9 @@ public static void siftDiskAnnLTMWithNVQ(List<VectorFloat<?>> baseVectors, List<
// searching the index does not change
ReaderSupplier rs = ReaderSupplierFactory.open(indexPath);
OnDiskGraphIndex index = OnDiskGraphIndex.load(rs);
try (RandomAccessReader in = new SimpleMappedReader(pqPath)) {
try (ReaderSupplier pqSupplier = ReaderSupplierFactory.open(pqPath);
RandomAccessReader in = pqSupplier.get())
{
var pqvSearch = PQVectors.load(in);
Function<VectorFloat<?>, SearchScoreProvider> sspFactory = q -> {
ApproximateScoreFunction asf = pqvSearch.precomputedScoreFunctionFor(q, VectorSimilarityFunction.EUCLIDEAN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ public void testSaveAndLoad() throws IOException {
}

builder = newBuilder.get();
try(var reader = new SimpleMappedReader(indexDataPath)) {
builder.load(reader);
try(var readerSupplier = new SimpleMappedReader.Supplier(indexDataPath)) {
builder.load(readerSupplier.get());
}

assertEquals(ravv.size(), builder.graph.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public void testCleanup() throws IOException {
graph.save(out);
}
var b2 = new GraphIndexBuilder(ravv, VectorSimilarityFunction.COSINE, 2, 10, 1.0f, 1.0f);
try (var marr = new SimpleMappedReader(outputPath.toAbsolutePath().toString())) {
b2.load(marr);
try (var readerSupplier = new SimpleMappedReader.Supplier(outputPath)) {
b2.load(readerSupplier.get());
}
var reloadedGraph = b2.getGraph();
assertGraphEquals(graph, reloadedGraph);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public void tearDown() {

@Test
public void testGraphCacheLoading() throws Exception {
try (var marr = new SimpleMappedReader(onDiskGraphIndexPath.toAbsolutePath().toString());
var onDiskGraph = OnDiskGraphIndex.load(marr::duplicate))
try (var readerSupplier = new SimpleMappedReader.Supplier(onDiskGraphIndexPath);
var onDiskGraph = OnDiskGraphIndex.load(readerSupplier))
{
var none = GraphCache.load(onDiskGraph, -1);
assertEquals(0, none.ramBytesUsed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public void testSimpleGraphs() throws Exception {
var outputPath = testDirectory.resolve("test_graph_" + graph.getClass().getSimpleName());
var ravv = new TestVectorGraph.CircularFloatVectorValues(graph.size());
TestUtil.writeGraph(graph, ravv, outputPath);
try (var marr = new SimpleMappedReader(outputPath.toAbsolutePath().toString());
var onDiskGraph = OnDiskGraphIndex.load(marr::duplicate))
try (var readerSupplier = new SimpleMappedReader.Supplier(outputPath.toAbsolutePath());
var onDiskGraph = OnDiskGraphIndex.load(readerSupplier))
{
TestUtil.assertGraphEquals(graph, onDiskGraph);
try (var onDiskView = onDiskGraph.getView()) {
Expand Down Expand Up @@ -114,8 +114,8 @@ public void testRenumberingOnDelete() throws IOException {
var outputPath = testDirectory.resolve("renumbered_graph");
OnDiskGraphIndex.write(original, ravv, oldToNewMap, outputPath);
// check that written graph ordinals match the new ones
try (var marr = new SimpleMappedReader(outputPath.toAbsolutePath().toString());
var onDiskGraph = OnDiskGraphIndex.load(marr::duplicate);
try (var readerSupplier = new SimpleMappedReader.Supplier(outputPath.toAbsolutePath());
var onDiskGraph = OnDiskGraphIndex.load(readerSupplier);
var onDiskView = onDiskGraph.getView())
{
// entry point renumbering
Expand Down Expand Up @@ -146,8 +146,8 @@ public void testReorderingRenumbering() throws IOException {
var outputPath = testDirectory.resolve("renumbered_graph");
OnDiskGraphIndex.write(original, ravv, oldToNewMap, outputPath);
// check that written graph ordinals match the new ones
try (var marr = new SimpleMappedReader(outputPath.toAbsolutePath().toString());
var onDiskGraph = OnDiskGraphIndex.load(marr::duplicate);
try (var readerSupplier = new SimpleMappedReader.Supplier(outputPath);
var onDiskGraph = OnDiskGraphIndex.load(readerSupplier);
var onDiskView = onDiskGraph.getView())
{
assertEquals(onDiskView.getVector(0), ravv.getVector(2));
Expand Down Expand Up @@ -175,8 +175,8 @@ public void testReorderingWithHoles() throws IOException {
var outputPath = testDirectory.resolve("renumbered_graph");
OnDiskGraphIndex.write(original, ravv, oldToNewMap, outputPath);
// check that written graph ordinals match the new ones
try (var marr = new SimpleMappedReader(outputPath.toAbsolutePath().toString());
var onDiskGraph = OnDiskGraphIndex.load(marr::duplicate);
try (var readerSupplier = new SimpleMappedReader.Supplier(outputPath.toAbsolutePath());
var onDiskGraph = OnDiskGraphIndex.load(readerSupplier);
var onDiskView = onDiskGraph.getView())
{
assertEquals(onDiskView.getVector(0), ravv.getVector(2));
Expand All @@ -201,8 +201,8 @@ public void testLargeGraph() throws Exception
var ravv = new TestVectorGraph.CircularFloatVectorValues(graph.size());
TestUtil.writeGraph(graph, ravv, outputPath);

try (var marr = new SimpleMappedReader(outputPath.toAbsolutePath().toString());
var onDiskGraph = OnDiskGraphIndex.load(marr::duplicate);
try (var readerSupplier = new SimpleMappedReader.Supplier(outputPath.toAbsolutePath());
var onDiskGraph = OnDiskGraphIndex.load(readerSupplier);
var cachedOnDiskGraph = new CachingGraphIndex(onDiskGraph))
{
TestUtil.assertGraphEquals(graph, onDiskGraph);
Expand All @@ -220,8 +220,8 @@ public void testLargeGraph() throws Exception
public void testV0Read() throws IOException {
// using a random graph from testLargeGraph generated on old version
var file = new File("resources/version0.odgi");
try (var smr = new SimpleMappedReader(file.getAbsolutePath());
var onDiskGraph = OnDiskGraphIndex.load(smr::duplicate);
try (var readerSupplier = new SimpleMappedReader.Supplier(file.toPath());
var onDiskGraph = OnDiskGraphIndex.load(readerSupplier);
var onDiskView = onDiskGraph.getView())
{
assertEquals(32, onDiskGraph.maxDegree);
Expand All @@ -241,8 +241,8 @@ public void testV0Write() throws IOException {
var fileIn = new File("resources/version0.odgi");
var fileOut = File.createTempFile("version0", ".odgi");

try (var smr = new SimpleMappedReader(fileIn.getAbsolutePath());
var graph = OnDiskGraphIndex.load(smr::duplicate);
try (var readerSupplier = new SimpleMappedReader.Supplier(fileIn.toPath());
var graph = OnDiskGraphIndex.load(readerSupplier);
var view = graph.getView())
{
try (var writer = new OnDiskGraphIndexWriter.Builder(graph, fileOut.toPath())
Expand All @@ -265,8 +265,8 @@ public void testV0WriteIncremental() throws IOException {
var fileIn = new File("resources/version0.odgi");
var fileOut = File.createTempFile("version0", ".odgi");

try (var smr = new SimpleMappedReader(fileIn.getAbsolutePath());
var graph = OnDiskGraphIndex.load(smr::duplicate);
try (var readerSupplier = new SimpleMappedReader.Supplier(fileIn.toPath());
var graph = OnDiskGraphIndex.load(readerSupplier);
var view = graph.getView())
{
try (var writer = new OnDiskGraphIndexWriter.Builder(graph, fileOut.toPath())
Expand Down Expand Up @@ -340,10 +340,10 @@ public void testIncrementalWrites() throws IOException {
}

// graph and vectors should be identical
try (var bulkMarr = new SimpleMappedReader(bulkPath.toAbsolutePath().toString());
var bulkGraph = OnDiskGraphIndex.load(bulkMarr::duplicate);
var incrementalMarr = new SimpleMappedReader(incrementalFadcPath.toAbsolutePath().toString());
var incrementalGraph = OnDiskGraphIndex.load(incrementalMarr::duplicate);
try (var bulkReaderSupplier = new SimpleMappedReader.Supplier(bulkPath.toAbsolutePath());
var bulkGraph = OnDiskGraphIndex.load(bulkReaderSupplier);
var incrementalReaderSupplier = new SimpleMappedReader.Supplier(incrementalFadcPath.toAbsolutePath());
var incrementalGraph = OnDiskGraphIndex.load(incrementalReaderSupplier);
var incrementalView = incrementalGraph.getView())
{
assertTrue(OnDiskGraphIndex.areHeadersEqual(incrementalGraph, bulkGraph));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public void testFusedGraph() throws Exception {

TestUtil.writeFusedGraph(graph, ravv, pqv, outputPath);

try (var marr = new SimpleMappedReader(outputPath.toAbsolutePath().toString());
var onDiskGraph = OnDiskGraphIndex.load(marr::duplicate, 0);
try (var readerSupplier = new SimpleMappedReader.Supplier(outputPath);
var onDiskGraph = OnDiskGraphIndex.load(readerSupplier, 0);
var cachedOnDiskGraph = new CachingGraphIndex(onDiskGraph, 5))
{
TestUtil.assertGraphEquals(graph, onDiskGraph);
Expand Down
Loading

0 comments on commit 9613109

Please sign in to comment.