diff --git a/src/main/java/org/opensearch/knn/index/KNNSettings.java b/src/main/java/org/opensearch/knn/index/KNNSettings.java index 03ec87758..e9c0ce255 100644 --- a/src/main/java/org/opensearch/knn/index/KNNSettings.java +++ b/src/main/java/org/opensearch/knn/index/KNNSettings.java @@ -216,7 +216,7 @@ public class KNNSettings { public static final Setting REMOTE_INDEX_BUILD_STATUS_WAIT_TIME_SETTING = Setting.timeSetting( REMOTE_INDEX_BUILD_STATUS_WAIT_TIME, - TimeValue.timeValueSeconds(1), + TimeValue.timeValueSeconds(5), Setting.Property.Dynamic, Setting.Property.NodeScope ); diff --git a/src/main/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsWriter.java b/src/main/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsWriter.java index dcb8a9706..b55acb71f 100644 --- a/src/main/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsWriter.java +++ b/src/main/java/org/opensearch/knn/index/codec/KNN990Codec/NativeEngines990KnnVectorsWriter.java @@ -170,12 +170,14 @@ public void mergeOneField(final FieldInfo fieldInfo, final MergeState mergeState return; } StopWatch stopWatch = new StopWatch().start(); + String context = "local"; if (KNNSettings.isRemoteIndexBuildEnabled() && totalLiveDocs >= KNNSettings.getRemoteIndexBuildMaxDocs()) { log.info( "Remote index build is enabled and total live docs {} are greater than equal to the threshold {}", totalLiveDocs, KNNSettings.getRemoteIndexBuildMaxDocs() ); + context = "remote"; this.remoteIndexBuild.buildIndexRemotely(fieldInfo, knnVectorValuesSupplier, totalLiveDocs); } else { log.info("Building index locally, live docs {}, setting value: {}", totalLiveDocs, KNNSettings.getRemoteIndexBuildMaxDocs()); @@ -186,7 +188,13 @@ public void mergeOneField(final FieldInfo fieldInfo, final MergeState mergeState long time_in_millis = stopWatch.stop().totalTime().millis(); KNNGraphValue.MERGE_TOTAL_TIME_IN_MILLIS.incrementBy(time_in_millis); - log.debug("Merge took {} ms for vector field [{}]", time_in_millis, fieldInfo.getName()); + log.info( + "Merge {} took {} ms for vector field [{}] with live docs: {}", + context, + time_in_millis, + fieldInfo.getName(), + totalLiveDocs + ); } /** diff --git a/src/main/java/org/opensearch/knn/index/codec/nativeindex/RemoteIndexBuild.java b/src/main/java/org/opensearch/knn/index/codec/nativeindex/RemoteIndexBuild.java index 896a6ac80..b305ad3e8 100644 --- a/src/main/java/org/opensearch/knn/index/codec/nativeindex/RemoteIndexBuild.java +++ b/src/main/java/org/opensearch/knn/index/codec/nativeindex/RemoteIndexBuild.java @@ -11,7 +11,9 @@ import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.store.IndexOutput; import org.opensearch.common.StopWatch; +import org.opensearch.knn.common.KNNConstants; import org.opensearch.knn.index.KNNSettings; +import org.opensearch.knn.index.SpaceType; import org.opensearch.knn.index.engine.KNNEngine; import org.opensearch.knn.index.vectorvalues.KNNFloatVectorValues; import org.opensearch.knn.index.vectorvalues.KNNVectorValues; @@ -33,7 +35,7 @@ public class RemoteIndexBuild { private static final String COMPLETED_STATUS = "completed"; private static final String FAILED_STATUS = "failed"; - private final S3Client s3Client; + private S3Client s3Client; private final IndexBuildServiceClient indexBuildServiceClient; private final String indexUUID; private final SegmentWriteState segmentWriteState; @@ -41,7 +43,6 @@ public class RemoteIndexBuild { public RemoteIndexBuild(final String indexUUID, final SegmentWriteState segmentWriteState) { this.indexUUID = indexUUID; try { - this.s3Client = S3Client.getInstance(); this.indexBuildServiceClient = IndexBuildServiceClient.getInstance(); this.segmentWriteState = segmentWriteState; } catch (Exception e) { @@ -50,6 +51,8 @@ public RemoteIndexBuild(final String indexUUID, final SegmentWriteState segmentW } public void buildIndexRemotely(FieldInfo fieldInfo, Supplier> knnVectorValuesSupplier, int totalLiveDocs) { + // TODO: creating a new instance of S3 here, as because of some reason the requests were getting timed out. Need to fix that. + s3Client = new S3Client(); try { // First upload all the vectors to S3 String objectKey = uploadToS3(fieldInfo, knnVectorValuesSupplier); @@ -69,21 +72,23 @@ public void buildIndexRemotely(FieldInfo fieldInfo, Supplier> } private GetJobResponse isIndexBuildCompletedWithoutErrors(final CreateIndexResponse response) { - try { - GetJobRequest getJobRequest = GetJobRequest.builder().jobId(response.getIndexCreationRequestId()).build(); - log.info("Waiting for index build to be completed: {}", response.getIndexCreationRequestId()); - GetJobResponse getJobResponse = indexBuildServiceClient.getJob(getJobRequest); - if (COMPLETED_STATUS.equals(getJobResponse.getStatus()) || FAILED_STATUS.equals(getJobResponse.getStatus())) { - log.info("Remote Index build completed with status: {}", getJobResponse.getStatus()); - return getJobResponse; - } else { - log.info("Index build is still in progress. Current status: {}", getJobResponse.getStatus()); - // I am using the same merge thread to ensure that we are not completing merge early. - Thread.sleep(KNNSettings.getIndexBuildStatusWaitTime()); - return isIndexBuildCompletedWithoutErrors(response); + while (true) { + try { + GetJobRequest getJobRequest = GetJobRequest.builder().jobId(response.getIndexCreationRequestId()).build(); + log.info("Waiting for index build to be completed: {}", response.getIndexCreationRequestId()); + GetJobResponse getJobResponse = indexBuildServiceClient.getJob(getJobRequest); + if (COMPLETED_STATUS.equals(getJobResponse.getStatus()) || FAILED_STATUS.equals(getJobResponse.getStatus())) { + log.info("Remote Index build completed with status: {}", getJobResponse.getStatus()); + return getJobResponse; + } else { + log.info("Index build is still in progress. Current status: {}", getJobResponse.getStatus()); + // I am using the same merge thread to ensure that we are not completing merge early. + Thread.sleep(KNNSettings.getIndexBuildStatusWaitTime()); + } + } catch (Exception e) { + log.error("Failed to wait for remote index build to be completed: {}", response.getIndexCreationRequestId(), e); + break; } - } catch (Exception e) { - log.error("Failed to wait for remote index build to be completed: {}", response.getIndexCreationRequestId(), e); } return GetJobResponse.builder().status("errored").build(); } @@ -135,10 +140,12 @@ private void downloadGraphFileFromS3(GetJobResponse getJobResponse, FieldInfo fi private CreateIndexRequest buildCreateIndexRequest(final FieldInfo fieldInfo, int totalLiveDocs, String objectKey) { int dimension = fieldInfo.getVectorDimension(); + String spaceType = fieldInfo.attributes().getOrDefault(KNNConstants.SPACE_TYPE, SpaceType.DEFAULT.getValue()); return CreateIndexRequest.builder() .bucketName(KNNSettings.getKnnS3BucketName()) .objectLocation(objectKey) .dimensions(dimension) + .spaceType(spaceType) .numberOfVectors(totalLiveDocs) .build(); } diff --git a/src/main/java/org/opensearch/knn/remote/index/model/CreateIndexRequest.java b/src/main/java/org/opensearch/knn/remote/index/model/CreateIndexRequest.java index fb8501fc4..5e08f4acd 100644 --- a/src/main/java/org/opensearch/knn/remote/index/model/CreateIndexRequest.java +++ b/src/main/java/org/opensearch/knn/remote/index/model/CreateIndexRequest.java @@ -19,6 +19,7 @@ public class CreateIndexRequest implements ToXContentObject { String objectLocation; long numberOfVectors; int dimensions; + String spaceType; @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @@ -27,6 +28,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .field("object_location", objectLocation) .field("number_of_vectors", numberOfVectors) .field("dimensions", dimensions) + .field("space_type", spaceType) .endObject(); } } diff --git a/src/main/java/org/opensearch/knn/remote/index/s3/S3Client.java b/src/main/java/org/opensearch/knn/remote/index/s3/S3Client.java index 6e1bd33e0..ebcf42e8e 100644 --- a/src/main/java/org/opensearch/knn/remote/index/s3/S3Client.java +++ b/src/main/java/org/opensearch/knn/remote/index/s3/S3Client.java @@ -82,7 +82,7 @@ public static S3Client getInstance() throws IOException { } @SuppressForbidden(reason = "Need to provide this override to v2 SDK so that path does not default to home path") - private S3Client() { + public S3Client() { SocketAccess.doPrivilegedException(() -> { if (ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.getStringValue().isEmpty()) { System.setProperty(