Skip to content

Commit

Permalink
add context stash/restore to write operation
Browse files Browse the repository at this point in the history
Signed-off-by: Brian Flores <[email protected]>
  • Loading branch information
brianf-aws committed Jan 14, 2025
1 parent fc8ded6 commit 6323086
Showing 1 changed file with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.ActionFilters;
Expand Down Expand Up @@ -206,13 +207,22 @@ private void bulkSetModelIndexToUndeploy(
bulkUpdateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
log.info("No nodes service: {}", Arrays.toString(modelIds));

client.bulk(bulkUpdateRequest, ActionListener.wrap(br -> {
log.debug("Successfully set modelIds to UNDEPLOY in index");
listener.onResponse(new MLUndeployModelsResponse(response));
}, e -> {
log.error("Failed to set modelIds to UNDEPLOY in index", e);
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<MLUndeployModelsResponse> listenerWithContextRestoration = ActionListener.runBefore(listener, () -> threadContext.restore());
ActionListener<BulkResponse> bulkResponseListener = ActionListener.wrap(br -> {
log.debug("Successfully set modelIds to UNDEPLOY in index");
listenerWithContextRestoration.onResponse(new MLUndeployModelsResponse(response));
}, e -> {
log.error("Failed to set modelIds to UNDEPLOY in index", e);
listenerWithContextRestoration.onFailure(e);
});

client.bulk(bulkUpdateRequest, bulkResponseListener);
} catch (Exception e) {
log.error("Unexpected error while setting modelIds to UNDEPLOY status to index", e);
listener.onFailure(e);
}));
}

}

private void validateAccess(String modelId, ActionListener<Boolean> listener) {
Expand Down

0 comments on commit 6323086

Please sign in to comment.