Skip to content

Commit

Permalink
Moved the singleton instance of S3 to per request instance, to fix so…
Browse files Browse the repository at this point in the history
…me timeout issues and started passing spaceType in the create Index request

Signed-off-by: Navneet Verma <[email protected]>
  • Loading branch information
navneet1v committed Jan 3, 2025
1 parent f5b71a8 commit e9e36cd
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public class KNNSettings {

public static final Setting<TimeValue> REMOTE_INDEX_BUILD_STATUS_WAIT_TIME_SETTING = Setting.timeSetting(
REMOTE_INDEX_BUILD_STATUS_WAIT_TIME,
TimeValue.timeValueSeconds(1),
TimeValue.timeValueSeconds(5),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ public void mergeOneField(final FieldInfo fieldInfo, final MergeState mergeState
return;
}
StopWatch stopWatch = new StopWatch().start();
String context = "local";
if (KNNSettings.isRemoteIndexBuildEnabled() && totalLiveDocs >= KNNSettings.getRemoteIndexBuildMaxDocs()) {
log.info(
"Remote index build is enabled and total live docs {} are greater than equal to the threshold {}",
totalLiveDocs,
KNNSettings.getRemoteIndexBuildMaxDocs()
);
context = "remote";
this.remoteIndexBuild.buildIndexRemotely(fieldInfo, knnVectorValuesSupplier, totalLiveDocs);
} else {
log.info("Building index locally, live docs {}, setting value: {}", totalLiveDocs, KNNSettings.getRemoteIndexBuildMaxDocs());
Expand All @@ -186,7 +188,13 @@ public void mergeOneField(final FieldInfo fieldInfo, final MergeState mergeState

long time_in_millis = stopWatch.stop().totalTime().millis();
KNNGraphValue.MERGE_TOTAL_TIME_IN_MILLIS.incrementBy(time_in_millis);
log.debug("Merge took {} ms for vector field [{}]", time_in_millis, fieldInfo.getName());
log.info(
"Merge {} took {} ms for vector field [{}] with live docs: {}",
context,
time_in_millis,
fieldInfo.getName(),
totalLiveDocs
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.StopWatch;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.vectorvalues.KNNFloatVectorValues;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
Expand All @@ -33,15 +35,14 @@
public class RemoteIndexBuild {
private static final String COMPLETED_STATUS = "completed";
private static final String FAILED_STATUS = "failed";
private final S3Client s3Client;
private S3Client s3Client;
private final IndexBuildServiceClient indexBuildServiceClient;
private final String indexUUID;
private final SegmentWriteState segmentWriteState;

public RemoteIndexBuild(final String indexUUID, final SegmentWriteState segmentWriteState) {
this.indexUUID = indexUUID;
try {
this.s3Client = S3Client.getInstance();
this.indexBuildServiceClient = IndexBuildServiceClient.getInstance();
this.segmentWriteState = segmentWriteState;
} catch (Exception e) {
Expand All @@ -50,6 +51,8 @@ public RemoteIndexBuild(final String indexUUID, final SegmentWriteState segmentW
}

public void buildIndexRemotely(FieldInfo fieldInfo, Supplier<KNNVectorValues<?>> knnVectorValuesSupplier, int totalLiveDocs) {
// TODO: creating a new instance of S3 here, as because of some reason the requests were getting timed out. Need to fix that.
s3Client = new S3Client();
try {
// First upload all the vectors to S3
String objectKey = uploadToS3(fieldInfo, knnVectorValuesSupplier);
Expand All @@ -69,21 +72,23 @@ public void buildIndexRemotely(FieldInfo fieldInfo, Supplier<KNNVectorValues<?>>
}

private GetJobResponse isIndexBuildCompletedWithoutErrors(final CreateIndexResponse response) {
try {
GetJobRequest getJobRequest = GetJobRequest.builder().jobId(response.getIndexCreationRequestId()).build();
log.info("Waiting for index build to be completed: {}", response.getIndexCreationRequestId());
GetJobResponse getJobResponse = indexBuildServiceClient.getJob(getJobRequest);
if (COMPLETED_STATUS.equals(getJobResponse.getStatus()) || FAILED_STATUS.equals(getJobResponse.getStatus())) {
log.info("Remote Index build completed with status: {}", getJobResponse.getStatus());
return getJobResponse;
} else {
log.info("Index build is still in progress. Current status: {}", getJobResponse.getStatus());
// I am using the same merge thread to ensure that we are not completing merge early.
Thread.sleep(KNNSettings.getIndexBuildStatusWaitTime());
return isIndexBuildCompletedWithoutErrors(response);
while (true) {
try {
GetJobRequest getJobRequest = GetJobRequest.builder().jobId(response.getIndexCreationRequestId()).build();
log.info("Waiting for index build to be completed: {}", response.getIndexCreationRequestId());
GetJobResponse getJobResponse = indexBuildServiceClient.getJob(getJobRequest);
if (COMPLETED_STATUS.equals(getJobResponse.getStatus()) || FAILED_STATUS.equals(getJobResponse.getStatus())) {
log.info("Remote Index build completed with status: {}", getJobResponse.getStatus());
return getJobResponse;
} else {
log.info("Index build is still in progress. Current status: {}", getJobResponse.getStatus());
// I am using the same merge thread to ensure that we are not completing merge early.
Thread.sleep(KNNSettings.getIndexBuildStatusWaitTime());
}
} catch (Exception e) {
log.error("Failed to wait for remote index build to be completed: {}", response.getIndexCreationRequestId(), e);
break;
}
} catch (Exception e) {
log.error("Failed to wait for remote index build to be completed: {}", response.getIndexCreationRequestId(), e);
}
return GetJobResponse.builder().status("errored").build();
}
Expand Down Expand Up @@ -135,10 +140,12 @@ private void downloadGraphFileFromS3(GetJobResponse getJobResponse, FieldInfo fi

private CreateIndexRequest buildCreateIndexRequest(final FieldInfo fieldInfo, int totalLiveDocs, String objectKey) {
int dimension = fieldInfo.getVectorDimension();
String spaceType = fieldInfo.attributes().getOrDefault(KNNConstants.SPACE_TYPE, SpaceType.DEFAULT.getValue());
return CreateIndexRequest.builder()
.bucketName(KNNSettings.getKnnS3BucketName())
.objectLocation(objectKey)
.dimensions(dimension)
.spaceType(spaceType)
.numberOfVectors(totalLiveDocs)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class CreateIndexRequest implements ToXContentObject {
String objectLocation;
long numberOfVectors;
int dimensions;
String spaceType;

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand All @@ -27,6 +28,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field("object_location", objectLocation)
.field("number_of_vectors", numberOfVectors)
.field("dimensions", dimensions)
.field("space_type", spaceType)
.endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static S3Client getInstance() throws IOException {
}

@SuppressForbidden(reason = "Need to provide this override to v2 SDK so that path does not default to home path")
private S3Client() {
public S3Client() {
SocketAccess.doPrivilegedException(() -> {
if (ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.getStringValue().isEmpty()) {
System.setProperty(
Expand Down

0 comments on commit e9e36cd

Please sign in to comment.