Skip to content

Commit

Permalink
Introduce derived vector source via stored fields (opensearch-project…
Browse files Browse the repository at this point in the history
…#2449)

Generates the vector source in the source field from the
KnnVectorsFormat or BVD. It does this by adding StoredFieldsFormat to
our existing custom codec.

Currently, feature is experimental and behind a feature flag via index 
setting. In the future, we need to iterate to improve performance 
and stability for nested/object portions.

Signed-off-by: John Mazanec <[email protected]>
  • Loading branch information
jmazanec15 authored Jan 29, 2025
1 parent 168ee3c commit 59b8e6b
Show file tree
Hide file tree
Showing 29 changed files with 3,015 additions and 52 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add a new build mode, `FAISS_OPT_LEVEL=avx512_spr`, which enables the use of advanced AVX-512 instructions introduced with Intel(R) Sapphire Rapids (#2404)[https://github.com/opensearch-project/k-NN/pull/2404]
- Add cosine similarity support for faiss engine (#2376)[https://github.com/opensearch-project/k-NN/pull/2376]
- Add concurrency optimizations with native memory graph loading and force eviction (#2265) [https://github.com/opensearch-project/k-NN/pull/2345]

- Add derived source feature for vector fields (#2449)[https://github.com/opensearch-project/k-NN/pull/2449]
### Enhancements
- Introduced a writing layer in native engines where relies on the writing interface to process IO. (#2241)[https://github.com/opensearch-project/k-NN/pull/2241]
- Allow method parameter override for training based indices (#2290) https://github.com/opensearch-project/k-NN/pull/2290]
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ integTest {
systemProperty("https", is_https)
systemProperty("user", user)
systemProperty("password", password)
systemProperty("test.exhaustive", System.getProperty("test.exhaustive"))

doFirst {
// Tell the test JVM if the cluster JVM is running under a debugger so that tests can
Expand Down Expand Up @@ -455,6 +456,7 @@ task integTestRemote(type: RestIntegTestTask) {
systemProperty 'cluster.number_of_nodes', "${_numNodes}"

systemProperty 'tests.security.manager', 'false'
systemProperty("test.exhaustive", System.getProperty("test.exhaustive"))

// Run tests with remote cluster only if rest case is defined
if (System.getProperty("tests.rest.cluster") != null) {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/opensearch/knn/common/KNNConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,8 @@ public class KNNConstants {

public static final String MODE_PARAMETER = "mode";
public static final String COMPRESSION_LEVEL_PARAMETER = "compression_level";

public static final String DERIVED_VECTOR_FIELD_ATTRIBUTE_KEY = "knn-derived-source-enabled";
public static final String DERIVED_VECTOR_FIELD_ATTRIBUTE_TRUE_VALUE = "true";
public static final String DERIVED_VECTOR_FIELD_ATTRIBUTE_FALSE_VALUE = "false";
}
23 changes: 22 additions & 1 deletion src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class KNNSettings {
public static final String KNN_FAISS_AVX512_DISABLED = "knn.faiss.avx512.disabled";
public static final String KNN_FAISS_AVX512_SPR_DISABLED = "knn.faiss.avx512_spr.disabled";
public static final String KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED = "index.knn.disk.vector.shard_level_rescoring_disabled";
public static final String KNN_DERIVED_SOURCE_ENABLED = "index.knn.derived_source.enabled";

/**
* Default setting values
Expand Down Expand Up @@ -269,6 +270,14 @@ public class KNNSettings {
Setting.Property.Dynamic
);

public static final Setting<Boolean> KNN_DERIVED_SOURCE_ENABLED_SETTING = Setting.boolSetting(
KNN_DERIVED_SOURCE_ENABLED,
false,
IndexScope,
Final,
UnmodifiableOnRestore
);

/**
* This setting identifies KNN index.
*/
Expand Down Expand Up @@ -518,6 +527,9 @@ private Setting<?> getSetting(String key) {
if (KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED.equals(key)) {
return KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING;
}
if (KNN_DERIVED_SOURCE_ENABLED.equals(key)) {
return KNN_DERIVED_SOURCE_ENABLED_SETTING;
}

throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}
Expand All @@ -543,7 +555,8 @@ public List<Setting<?>> getSettings() {
KNN_FAISS_AVX512_SPR_DISABLED_SETTING,
QUANTIZATION_STATE_CACHE_SIZE_LIMIT_SETTING,
QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES_SETTING,
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING,
KNN_DERIVED_SOURCE_ENABLED_SETTING
);
return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream()))
.collect(Collectors.toList());
Expand Down Expand Up @@ -581,6 +594,14 @@ public static boolean isFaissAVX2Disabled() {
}
}

/**
* check this index enabled/disabled derived source
* @param settings Settings
*/
public static boolean isKNNDerivedSourceEnabled(Settings settings) {
return KNN_DERIVED_SOURCE_ENABLED_SETTING.get(settings);
}

public static boolean isFaissAVX512Disabled() {
return Booleans.parseBoolean(
Objects.requireNonNullElse(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.codec.KNN9120Codec;

import lombok.AllArgsConstructor;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.opensearch.common.Nullable;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier;
import org.opensearch.knn.index.mapper.KNNVectorFieldType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.opensearch.knn.common.KNNConstants.DERIVED_VECTOR_FIELD_ATTRIBUTE_KEY;
import static org.opensearch.knn.common.KNNConstants.DERIVED_VECTOR_FIELD_ATTRIBUTE_TRUE_VALUE;

@AllArgsConstructor
public class DerivedSourceStoredFieldsFormat extends StoredFieldsFormat {

private final StoredFieldsFormat delegate;
private final DerivedSourceReadersSupplier derivedSourceReadersSupplier;
// IMPORTANT Do not rely on this for the reader, it will be null if SPI is used
@Nullable
private final MapperService mapperService;

@Override
public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo segmentInfo, FieldInfos fieldInfos, IOContext ioContext)
throws IOException {
List<FieldInfo> derivedVectorFields = null;
for (FieldInfo fieldInfo : fieldInfos) {
if (DERIVED_VECTOR_FIELD_ATTRIBUTE_TRUE_VALUE.equals(fieldInfo.attributes().get(DERIVED_VECTOR_FIELD_ATTRIBUTE_KEY))) {
// Lazily initialize the list of fields
if (derivedVectorFields == null) {
derivedVectorFields = new ArrayList<>();
}
derivedVectorFields.add(fieldInfo);
}
}
// If no fields have it enabled, we can just short-circuit and return the delegate's fieldReader
if (derivedVectorFields == null || derivedVectorFields.isEmpty()) {
return delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext);
}
return new DerivedSourceStoredFieldsReader(
delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext),
derivedVectorFields,
derivedSourceReadersSupplier,
new SegmentReadState(directory, segmentInfo, fieldInfos, ioContext)
);
}

@Override
public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo segmentInfo, IOContext ioContext) throws IOException {
StoredFieldsWriter delegateWriter = delegate.fieldsWriter(directory, segmentInfo, ioContext);
if (mapperService != null && KNNSettings.isKNNDerivedSourceEnabled(mapperService.getIndexSettings().getSettings())) {
List<String> vectorFieldTypes = new ArrayList<>();
for (MappedFieldType fieldType : mapperService.fieldTypes()) {
if (fieldType instanceof KNNVectorFieldType) {
vectorFieldTypes.add(fieldType.name());
}
}
if (vectorFieldTypes.isEmpty() == false) {
return new DerivedSourceStoredFieldsWriter(delegateWriter, vectorFieldTypes);
}
}
return delegateWriter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.codec.KNN9120Codec;

import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.util.IOUtils;
import org.opensearch.index.fieldvisitor.FieldsVisitor;
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier;
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceStoredFieldVisitor;
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceVectorInjector;

import java.io.IOException;
import java.util.List;

public class DerivedSourceStoredFieldsReader extends StoredFieldsReader {
private final StoredFieldsReader delegate;
private final List<FieldInfo> derivedVectorFields;
private final DerivedSourceReadersSupplier derivedSourceReadersSupplier;
private final SegmentReadState segmentReadState;
private final boolean shouldInject;

private final DerivedSourceVectorInjector derivedSourceVectorInjector;

/**
*
* @param delegate delegate StoredFieldsReader
* @param derivedVectorFields List of fields that are derived source fields
* @param derivedSourceReadersSupplier Supplier for the derived source readers
* @param segmentReadState SegmentReadState for the segment
* @throws IOException in case of I/O error
*/
public DerivedSourceStoredFieldsReader(
StoredFieldsReader delegate,
List<FieldInfo> derivedVectorFields,
DerivedSourceReadersSupplier derivedSourceReadersSupplier,
SegmentReadState segmentReadState
) throws IOException {
this(delegate, derivedVectorFields, derivedSourceReadersSupplier, segmentReadState, true);
}

private DerivedSourceStoredFieldsReader(
StoredFieldsReader delegate,
List<FieldInfo> derivedVectorFields,
DerivedSourceReadersSupplier derivedSourceReadersSupplier,
SegmentReadState segmentReadState,
boolean shouldInject
) throws IOException {
this.delegate = delegate;
this.derivedVectorFields = derivedVectorFields;
this.derivedSourceReadersSupplier = derivedSourceReadersSupplier;
this.segmentReadState = segmentReadState;
this.shouldInject = shouldInject;
this.derivedSourceVectorInjector = createDerivedSourceVectorInjector();
}

private DerivedSourceVectorInjector createDerivedSourceVectorInjector() throws IOException {
return new DerivedSourceVectorInjector(derivedSourceReadersSupplier, segmentReadState, derivedVectorFields);
}

@Override
public void document(int docId, StoredFieldVisitor storedFieldVisitor) throws IOException {
// If the visitor has explicitly indicated it does not need the fields, we should not inject them
boolean isVisitorNeedFields = true;
if (storedFieldVisitor instanceof FieldsVisitor) {
isVisitorNeedFields = derivedSourceVectorInjector.shouldInject(
((FieldsVisitor) storedFieldVisitor).includes(),
((FieldsVisitor) storedFieldVisitor).excludes()
);
}
if (shouldInject && isVisitorNeedFields) {
delegate.document(docId, new DerivedSourceStoredFieldVisitor(storedFieldVisitor, docId, derivedSourceVectorInjector));
return;
}
delegate.document(docId, storedFieldVisitor);
}

@Override
public StoredFieldsReader clone() {
try {
return new DerivedSourceStoredFieldsReader(
delegate.clone(),
derivedVectorFields,
derivedSourceReadersSupplier,
segmentReadState,
shouldInject
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void checkIntegrity() throws IOException {
delegate.checkIntegrity();
}

@Override
public void close() throws IOException {
IOUtils.close(delegate, derivedSourceVectorInjector);
}

/**
* For merging, we need to tell the derived source stored fields reader to skip injecting the source. Otherwise,
* on merge we will end up just writing the source to disk
*
* @return Merged instance that wont inject by default
*/
@Override
public StoredFieldsReader getMergeInstance() {
try {
return new DerivedSourceStoredFieldsReader(
delegate.getMergeInstance(),
derivedVectorFields,
derivedSourceReadersSupplier,
segmentReadState,
false
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Loading

0 comments on commit 59b8e6b

Please sign in to comment.