Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store compressed vectors in dense ByteSequence for PQVectors #370

Merged
merged 9 commits into from
Dec 2, 2024
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ Compressing the vectors with product quantization is done as follows:
16, // number of subspaces
256, // number of centroids per subspace
true); // center the dataset
ByteSequence<?>[] compressed = pq.encodeAll(ravv);
// Note: before jvector 3.1.0, encodeAll returned an array of ByteSequence.
PQVectors pqv = pq.encodeAll(ravv);
// write the compressed vectors to disk
PQVectors pqv = new PQVectors(pq, compressed);
pqv.write(out);
}
```
Expand Down
14 changes: 14 additions & 0 deletions UPGRADING.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Upgrading from 3.0.x to 3.1.x

## Critical API changes

- `VectorCompressor.encodeAll()` now returns a `CompressedVectors` object instead of a `ByteSequence<?>[]`.
This provides better encapsulation of the compression functionality while also allowing for more efficient
creation of the `CompressedVectors` object.
- The `ByteSequence` interface now includes an `offset()` method to provide offset information for the sequence.
any time the method `ByteSequence::get` is called, the full backing data is returned, and as such, the `offset()`
method is necessary to determine the offset of the data in the backing array.
- `PQVectors` constructor has been updated to support immutable instances and explicit chunking parameters.
- The `VectorCompressor.createCompressedVectors(Object[])` method is now deprecated in favor of the new API that returns
`CompressedVectors` directly from `encodeAll()`.

# Upgrading from 2.0.x to 3.0.x

## Critical API changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,18 @@ public CompressedVectors createCompressedVectors(Object[] compressedVectors) {
}

@Override
public long[][] encodeAll(RandomAccessVectorValues ravv, ForkJoinPool simdExecutor) {
return simdExecutor.submit(() -> IntStream.range(0, ravv.size())
public CompressedVectors encodeAll(RandomAccessVectorValues ravv, ForkJoinPool simdExecutor) {
var cv = simdExecutor.submit(() -> IntStream.range(0, ravv.size())
.parallel()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity:
does it really help to use "parallel" here ?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Massively.

.mapToObj(i -> encode(ravv.getVector(i)))
.mapToObj(i -> {
var vector = ravv.getVector(i);
return vector == null
? new long[compressedVectorSize() / Long.BYTES]
: encode(vector);
})
.toArray(long[][]::new))
.join();
return new BQVectors(this, cv);
}

/**
Expand All @@ -80,7 +86,13 @@ public long[][] encodeAll(RandomAccessVectorValues ravv, ForkJoinPool simdExecut
public long[] encode(VectorFloat<?> v) {
int M = (int) Math.ceil(v.length() / 64.0);
long[] encoded = new long[M];
for (int i = 0; i < M; i++) {
encodeTo(v, encoded);
return encoded;
}

@Override
public void encodeTo(VectorFloat<?> v, long[] dest) {
for (int i = 0; i < dest.length; i++) {
long bits = 0;
for (int j = 0; j < 64; j++) {
int idx = i * 64 + j;
Expand All @@ -91,9 +103,8 @@ public long[] encode(VectorFloat<?> v) {
bits |= 1L << j;
}
}
encoded[i] = bits;
dest[i] = bits;
}
return encoded;
}

@Override
Expand Down
210 changes: 181 additions & 29 deletions jvector-base/src/main/java/io/github/jbellis/jvector/pq/PQVectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.github.jbellis.jvector.pq;

import io.github.jbellis.jvector.disk.RandomAccessReader;
import io.github.jbellis.jvector.graph.RandomAccessVectorValues;
import io.github.jbellis.jvector.graph.similarity.ScoreFunction;
import io.github.jbellis.jvector.util.RamUsageEstimator;
import io.github.jbellis.jvector.vector.VectorSimilarityFunction;
Expand All @@ -28,34 +29,71 @@

import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;

public class PQVectors implements CompressedVectors {
private static final VectorTypeSupport vectorTypeSupport = VectorizationProvider.getInstance().getVectorTypeSupport();
static final int MAX_CHUNK_SIZE = Integer.MAX_VALUE - 16; // standard Java array size limit with some headroom
final ProductQuantization pq;
private final List<ByteSequence<?>> compressedVectors;
private final ByteSequence<?>[] compressedDataChunks;
private int vectorCount;
private final int vectorsPerChunk;
private final boolean mutable;

/**
* Initialize the PQVectors with an initial List of vectors. This list may be
* mutated, but caller is responsible for thread safety issues when doing so.
* Construct a PQVectors instance with the given ProductQuantization and maximum number of vectors that will be
* stored in this instance. The vectors are split into chunks to avoid exceeding the maximum array size.
* The instance is mutable and is not thread-safe.
* @param pq the ProductQuantization to use
* @param maximumVectorCount the maximum number of vectors that will be stored in this instance
*/
public PQVectors(ProductQuantization pq, List<ByteSequence<?>> compressedVectors)
public PQVectors(ProductQuantization pq, int maximumVectorCount)
{
this.pq = pq;
this.compressedVectors = compressedVectors;
this.mutable = true;
this.vectorCount = 0;

// Calculate if we need to split into multiple chunks
int compressedDimension = pq.compressedVectorSize();
long totalSize = (long) maximumVectorCount * compressedDimension;
this.vectorsPerChunk = totalSize <= MAX_CHUNK_SIZE ? maximumVectorCount : MAX_CHUNK_SIZE / compressedDimension;

int numChunks = maximumVectorCount / vectorsPerChunk;
ByteSequence<?>[] chunks = new ByteSequence<?>[numChunks];
int chunkSize = vectorsPerChunk * compressedDimension;
for (int i = 0; i < numChunks - 1; i++)
chunks[i] = vectorTypeSupport.createByteSequence(chunkSize);

// Last chunk might be smaller
int remainingVectors = maximumVectorCount - (vectorsPerChunk * (numChunks - 1));
chunks[numChunks - 1] = vectorTypeSupport.createByteSequence(remainingVectors * compressedDimension);

compressedDataChunks = chunks;
}

public PQVectors(ProductQuantization pq, ByteSequence<?>[] compressedVectors)
/**
* Construct a PQVectors instance with the given ProductQuantization and compressed data chunks. The instance is
* immutable and thread-safe, though the underlying vectors may be mutable.
* @param pq the ProductQuantization to use
* @param compressedDataChunks the compressed data chunks
* @param vectorCount the number of vectors
* @param vectorsPerChunk the number of vectors per chunk
*/
public PQVectors(ProductQuantization pq, ByteSequence<?>[] compressedDataChunks, int vectorCount, int vectorsPerChunk)
{
this(pq, List.of(compressedVectors));
this.pq = pq;
this.mutable = false;
this.compressedDataChunks = compressedDataChunks;
this.vectorCount = vectorCount;
this.vectorsPerChunk = vectorsPerChunk;
}

@Override
public int count() {
return compressedVectors.size();
return vectorCount;
}

@Override
Expand All @@ -65,10 +103,10 @@ public void write(DataOutput out, int version) throws IOException
pq.write(out, version);

// compressed vectors
out.writeInt(compressedVectors.size());
out.writeInt(vectorCount);
out.writeInt(pq.getSubspaceCount());
for (var v : compressedVectors) {
vectorTypeSupport.writeByteSequence(out, v);
for (ByteSequence<?> chunk : compressedDataChunks) {
vectorTypeSupport.writeByteSequence(out, chunk);
}
}

Expand All @@ -77,44 +115,74 @@ public static PQVectors load(RandomAccessReader in) throws IOException {
var pq = ProductQuantization.load(in);

// read the vectors
int size = in.readInt();
if (size < 0) {
throw new IOException("Invalid compressed vector count " + size);
int vectorCount = in.readInt();
if (vectorCount < 0) {
throw new IOException("Invalid compressed vector count " + vectorCount);
}
List<ByteSequence<?>> compressedVectors = new ArrayList<>(size);

int compressedDimension = in.readInt();
if (compressedDimension < 0) {
throw new IOException("Invalid compressed vector dimension " + compressedDimension);
}

for (int i = 0; i < size; i++)
{
ByteSequence<?> vector = vectorTypeSupport.readByteSequence(in, compressedDimension);
compressedVectors.add(vector);
// Calculate if we need to split into multiple chunks
long totalSize = (long) vectorCount * compressedDimension;
int vectorsPerChunk = totalSize <= MAX_CHUNK_SIZE ? vectorCount : MAX_CHUNK_SIZE / compressedDimension;

int numChunks = vectorCount / vectorsPerChunk;
ByteSequence<?>[] chunks = new ByteSequence<?>[numChunks];

for (int i = 0; i < numChunks - 1; i++) {
int chunkSize = vectorsPerChunk * compressedDimension;
chunks[i] = vectorTypeSupport.readByteSequence(in, chunkSize);
}

return new PQVectors(pq, compressedVectors);
// Last chunk might be smaller
int remainingVectors = vectorCount - (vectorsPerChunk * (numChunks - 1));
chunks[numChunks - 1] = vectorTypeSupport.readByteSequence(in, remainingVectors * compressedDimension);

return new PQVectors(pq, chunks, vectorCount, vectorsPerChunk);
}

public static PQVectors load(RandomAccessReader in, long offset) throws IOException {
in.seek(offset);
return load(in);
}

/**
* We consider two PQVectors equal when their PQs are equal and their compressed data is equal. We ignore the
* chunking strategy in the comparison since this is an implementation detail.
* @param o the object to check for equality
* @return true if the objects are equal, false otherwise
*/
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

PQVectors that = (PQVectors) o;
if (!Objects.equals(pq, that.pq)) return false;
return Objects.equals(compressedVectors, that.compressedVectors);
if (this.count() != that.count()) return false;
for (int i = 0; i < this.count(); i++) {
var thisNode = this.get(i);
var thatNode = that.get(i);
if (!thisNode.equals(thatNode)) return false;
}
return true;
}

@Override
public int hashCode() {
return Objects.hash(pq, compressedVectors);
int result = 1;
result = 31 * result + pq.hashCode();
result = 31 * result + count();

// We don't use the array structure in the hash code calculation because we allow for different chunking
// strategies. Instead, we use the first entry in the first chunk to provide a stable hash code.
for (int i = 0; i < count(); i++)
result = 31 * result + get(i).hashCode();

return result;
}

@Override
Expand Down Expand Up @@ -188,7 +256,43 @@ public ScoreFunction.ApproximateScoreFunction scoreFunctionFor(VectorFloat<?> q,
}

public ByteSequence<?> get(int ordinal) {
return compressedVectors.get(ordinal);
if (ordinal < 0 || ordinal >= vectorCount)
throw new IndexOutOfBoundsException("Ordinal " + ordinal + " out of bounds for vector count " + vectorCount);
return get(compressedDataChunks, ordinal, vectorsPerChunk, pq.getSubspaceCount());
}

private static ByteSequence<?> get(ByteSequence<?>[] chunks, int ordinal, int vectorsPerChunk, int subspaceCount) {
int chunkIndex = ordinal / vectorsPerChunk;
int vectorIndexInChunk = ordinal % vectorsPerChunk;
int start = vectorIndexInChunk * subspaceCount;
return chunks[chunkIndex].slice(start, subspaceCount);
}

/**
* Encode the given vector and set it at the given ordinal. Done without unnecessary copying.
* @param ordinal the ordinal to set
* @param vector the vector to encode and set
*/
public void encodeAndSet(int ordinal, VectorFloat<?> vector)
{
if (!mutable)
throw new UnsupportedOperationException("Cannot set values on an immutable PQVectors instance");

vectorCount++;
pq.encodeTo(vector, get(ordinal));
}

/**
* Set the vector at the given ordinal to zero.
* @param ordinal the ordinal to set
*/
public void setZero(int ordinal)
{
if (!mutable)
throw new UnsupportedOperationException("Cannot set values on an immutable PQVectors instance");

vectorCount++;
get(ordinal).zero();
}

public ProductQuantization getProductQuantization() {
Expand Down Expand Up @@ -225,16 +329,64 @@ public long ramBytesUsed() {
int AH_BYTES = RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;

long codebooksSize = pq.ramBytesUsed();
long listSize = (long) REF_BYTES * (1 + compressedVectors.size());
long dataSize = (long) (OH_BYTES + AH_BYTES + pq.compressedVectorSize()) * compressedVectors.size();
return codebooksSize + listSize + dataSize;
long chunksArraySize = OH_BYTES + AH_BYTES + (long) compressedDataChunks.length * REF_BYTES;
long dataSize = 0;
for (ByteSequence<?> chunk : compressedDataChunks) {
dataSize += chunk.ramBytesUsed();
}
return codebooksSize + chunksArraySize + dataSize;
}

@Override
public String toString() {
return "PQVectors{" +
"pq=" + pq +
", count=" + compressedVectors.size() +
", count=" + vectorCount +
'}';
}

/**
* Build a PQVectors instance from the given RandomAccessVectorValues. The vectors are encoded in parallel
* and split into chunks to avoid exceeding the maximum array size.
* @param pq the ProductQuantization to use
* @param vectorCount the number of vectors to encode
* @param ravv the RandomAccessVectorValues to encode
* @param simdExecutor the ForkJoinPool to use for SIMD operations
* @return the PQVectors instance
*/
public static PQVectors encodeAndBuild(ProductQuantization pq, int vectorCount, RandomAccessVectorValues ravv, ForkJoinPool simdExecutor) {

// Calculate if we need to split into multiple chunks
int compressedDimension = pq.compressedVectorSize();
long totalSize = (long) vectorCount * compressedDimension;
int vectorsPerChunk = totalSize <= MAX_CHUNK_SIZE ? vectorCount : MAX_CHUNK_SIZE / compressedDimension;

int numChunks = vectorCount / vectorsPerChunk;
final ByteSequence<?>[] chunks = new ByteSequence<?>[numChunks];
int chunkSize = vectorsPerChunk * compressedDimension;
for (int i = 0; i < numChunks - 1; i++)
chunks[i] = vectorTypeSupport.createByteSequence(chunkSize);

// Last chunk might be smaller
int remainingVectors = vectorCount - (vectorsPerChunk * (numChunks - 1));
chunks[numChunks - 1] = vectorTypeSupport.createByteSequence(remainingVectors * compressedDimension);

// Encode the vectors in parallel into the compressed data chunks
// The changes are concurrent, but because they are coordinated and do not overlap, we can use parallel streams
// and then we are guaranteed safe publication because we join the thread after completion.
simdExecutor.submit(() -> IntStream.range(0, ravv.size())
.parallel()
.forEach(ordinal -> {
// Retrieve the slice and mutate it.
var slice = get(chunks, ordinal, vectorsPerChunk, pq.getSubspaceCount());
var vector = ravv.getVector(ordinal);
if (vector != null)
pq.encodeTo(vector, slice);
else
slice.zero();
}))
.join();

return new PQVectors(pq, chunks, vectorCount, vectorsPerChunk);
}
}
Loading