From f981b836f0ace974625d9d722c2b356d7f1c4afa Mon Sep 17 00:00:00 2001 From: Ganesh Ramadurai Date: Mon, 30 Dec 2024 17:00:42 -0800 Subject: [PATCH] Concurrency optimization for graph native loading update Signed-off-by: Ganesh Ramadurai --- CHANGELOG.md | 1 + .../memory/NativeMemoryCacheManager.java | 3 + .../memory/NativeMemoryEntryContext.java | 84 ++++++++++++++++++- .../memory/NativeMemoryLoadStrategy.java | 17 ++-- .../memory/NativeMemoryCacheManagerTests.java | 3 + .../memory/NativeMemoryEntryContextTests.java | 23 +++-- .../memory/NativeMemoryLoadStrategyTests.java | 6 +- 7 files changed, 119 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a19b53fd8..d4b47e055 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add Support for Multi Values in innerHit for Nested k-NN Fields in Lucene and FAISS (#2283)[https://github.com/opensearch-project/k-NN/pull/2283] - Add binary index support for Lucene engine. (#2292)[https://github.com/opensearch-project/k-NN/pull/2292] - Add expand_nested_docs Parameter support to NMSLIB engine (#2331)[https://github.com/opensearch-project/k-NN/pull/2331] +- Add concurrency optimizations with native memory graph loading and force eviction (#2265) [https://github.com/opensearch-project/k-NN/pull/2345] ### Enhancements - Introduced a writing layer in native engines where relies on the writing interface to process IO. (#2241)[https://github.com/opensearch-project/k-NN/pull/2241] - Allow method parameter override for training based indices (#2290) https://github.com/opensearch-project/k-NN/pull/2290] diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index b8aecc5a5..185b86ebe 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -328,6 +328,9 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext nativeMemoryEntryC // Cache Miss // Evict before put + // openIndexInput the graph file before proceeding to load the graph into memory + nativeMemoryEntryContext.openIndexInput(); + logger.debug("[KNN] NativeMemoryCacheManager openIndexInput successful"); synchronized (this) { if (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight) { Iterator lruIterator = accessRecencyQueue.iterator(); diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryEntryContext.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryEntryContext.java index 0af13fb46..fbd537a2f 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryEntryContext.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryEntryContext.java @@ -13,11 +13,14 @@ import lombok.Getter; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper; import org.opensearch.knn.index.engine.qframe.QuantizationConfig; import org.opensearch.knn.index.VectorDataType; +import org.opensearch.knn.index.store.IndexInputWithBuffer; import java.io.IOException; import java.util.Map; @@ -26,7 +29,7 @@ /** * Encapsulates all information needed to load a component into native memory. */ -public abstract class NativeMemoryEntryContext { +public abstract class NativeMemoryEntryContext implements AutoCloseable { protected final String key; @@ -55,6 +58,18 @@ public String getKey() { */ public abstract Integer calculateSizeInKB(); + /** + * Opens the indexInput file so that it is available for graph loading + */ + + public void openIndexInput() {} + + /** + * Provides the capability to close the closable objects in the {@link NativeMemoryEntryContext} + */ + @Override + public void close() {} + /** * Loads entry into memory. * @@ -75,6 +90,17 @@ public static class IndexEntryContext extends NativeMemoryEntryContext { @@ -192,6 +264,11 @@ public Integer calculateSizeInKB() { return size; } + @Override + public void openIndexInput() { + return; + } + @Override public NativeMemoryAllocation.TrainingDataAllocation load() { return trainingLoadStrategy.load(this); @@ -278,6 +355,11 @@ public Integer calculateSizeInKB() { return size; } + @Override + public void openIndexInput() { + return; + } + @Override public NativeMemoryAllocation.AnonymousAllocation load() throws IOException { return loadStrategy.load(this); diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategy.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategy.java index 8cbdb4fd7..8b4ab57ce 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategy.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategy.java @@ -13,12 +13,9 @@ import lombok.extern.log4j.Log4j2; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; import org.opensearch.core.action.ActionListener; import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper; import org.opensearch.knn.index.engine.qframe.QuantizationConfig; -import org.opensearch.knn.index.store.IndexInputWithBuffer; import org.opensearch.knn.index.util.IndexUtil; import org.opensearch.knn.jni.JNIService; import org.opensearch.knn.index.engine.KNNEngine; @@ -88,10 +85,16 @@ public NativeMemoryAllocation.IndexAllocation load(NativeMemoryEntryContext.Inde final int indexSizeKb = Math.toIntExact(directory.fileLength(vectorFileName) / 1024); // Try to open an index input then pass it down to native engine for loading an index. - try (IndexInput readStream = directory.openInput(vectorFileName, IOContext.READONCE)) { - final IndexInputWithBuffer indexInputWithBuffer = new IndexInputWithBuffer(readStream); - final long indexAddress = JNIService.loadIndex(indexInputWithBuffer, indexEntryContext.getParameters(), knnEngine); - + // openIndexInput takes care of opening the indexInput file + if (!indexEntryContext.isIndexInputOpened()) { + throw new IllegalStateException("Index [" + indexEntryContext.getOpenSearchIndexName() + "] is not preloaded"); + } + try (indexEntryContext) { + final long indexAddress = JNIService.loadIndex( + indexEntryContext.indexInputWithBuffer, + indexEntryContext.getParameters(), + knnEngine + ); return createIndexAllocation(indexEntryContext, knnEngine, indexAddress, indexSizeKb, vectorFileName); } } diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java index 5fe41c88c..d356b2d7f 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java @@ -542,6 +542,9 @@ public Integer calculateSizeInKB() { return size; } + @Override + public void openIndexInput() {} + @Override public TestNativeMemoryAllocation load() throws IOException { return new TestNativeMemoryAllocation(size, memoryAddress); diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryEntryContextTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryEntryContextTests.java index 5379abc74..ce3f45f00 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryEntryContextTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryEntryContextTests.java @@ -30,6 +30,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.doReturn; public class NativeMemoryEntryContextTests extends KNNTestCase { @@ -42,12 +44,14 @@ public void testAbstract_getKey() { public void testIndexEntryContext_load() throws IOException { NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy = mock(NativeMemoryLoadStrategy.IndexLoadStrategy.class); - NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext( - (Directory) null, - TestUtils.createFakeNativeMamoryCacheKey("test"), - indexLoadStrategy, - null, - "test" + NativeMemoryEntryContext.IndexEntryContext indexEntryContext = spy( + new NativeMemoryEntryContext.IndexEntryContext( + (Directory) null, + TestUtils.createFakeNativeMamoryCacheKey("test"), + indexLoadStrategy, + null, + "test" + ) ); NativeMemoryAllocation.IndexAllocation indexAllocation = new NativeMemoryAllocation.IndexAllocation( @@ -61,6 +65,8 @@ public void testIndexEntryContext_load() throws IOException { when(indexLoadStrategy.load(indexEntryContext)).thenReturn(indexAllocation); + // since we are returning mock instance, set indexEntryContext.indexInputOpened to true. + doReturn(true).when(indexEntryContext).isIndexInputOpened(); assertEquals(indexAllocation, indexEntryContext.load()); } @@ -292,6 +298,11 @@ public Integer calculateSizeInKB() { return size; } + @Override + public void openIndexInput() { + return; + } + @Override public TestNativeMemoryAllocation load() throws IOException { return null; diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategyTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategyTests.java index 735974bd1..7da68f9ee 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategyTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategyTests.java @@ -69,8 +69,7 @@ public void testIndexLoadStrategy_load() throws IOException { ); // Load - NativeMemoryAllocation.IndexAllocation indexAllocation = NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance() - .load(indexEntryContext); + NativeMemoryAllocation.IndexAllocation indexAllocation = indexEntryContext.load(); // Confirm that the file was loaded by querying float[] query = new float[dimension]; @@ -115,8 +114,7 @@ public void testLoad_whenFaissBinary_thenSuccess() throws IOException { ); // Load - NativeMemoryAllocation.IndexAllocation indexAllocation = NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance() - .load(indexEntryContext); + NativeMemoryAllocation.IndexAllocation indexAllocation = indexEntryContext.load(); // Verify assertTrue(indexAllocation.isBinaryIndex());