diff --git a/core/src/main/java/org/opensearch/remote/metadata/client/SdkClient.java b/core/src/main/java/org/opensearch/remote/metadata/client/SdkClient.java index 49f1d16..812f806 100644 --- a/core/src/main/java/org/opensearch/remote/metadata/client/SdkClient.java +++ b/core/src/main/java/org/opensearch/remote/metadata/client/SdkClient.java @@ -26,15 +26,27 @@ public class SdkClient { private final SdkClientDelegate delegate; + private final Executor defaultExecutor; private final Boolean isMultiTenancyEnabled; /** - * Instantiate this client + * Instantiate this client with the {@link ForkJoinPool#commonPool()} as the default executor * @param delegate The client implementation to delegate calls to * @param multiTenancy whether multiTenancy is enabled */ public SdkClient(SdkClientDelegate delegate, Boolean multiTenancy) { + this(delegate, ForkJoinPool.commonPool(), multiTenancy); + } + + /** + * Instantiate this client with a default Executor + * @param delegate The client implementation to delegate calls to + * @param defaultExecutor A default executor to use for asynchronous execution unless otherwise specified + * @param multiTenancy whether multiTenancy is enabled + */ + public SdkClient(SdkClientDelegate delegate, Executor defaultExecutor, Boolean multiTenancy) { this.delegate = delegate; + this.defaultExecutor = defaultExecutor; this.isMultiTenancyEnabled = multiTenancy; } @@ -50,12 +62,12 @@ public CompletionStage putDataObjectAsync(PutDataObjectRe } /** - * Create/Put/Index a data object/document into a table/index. + * Create/Put/Index a data object/document into a table/index using the default executor. * @param request A request encapsulating the data object to store * @return A completion stage encapsulating the response or exception */ public CompletionStage putDataObjectAsync(PutDataObjectRequest request) { - return putDataObjectAsync(request, ForkJoinPool.commonPool()); + return putDataObjectAsync(request, defaultExecutor); } /** @@ -72,7 +84,7 @@ public PutDataObjectResponse putDataObject(PutDataObjectRequest request) { } /** - * Read/Get a data object/document from a table/index. + * Read/Get a data object/document from a table/index using the default executor. * * @param request A request identifying the data object to retrieve * @param executor the executor to use for asynchronous execution @@ -90,7 +102,7 @@ public CompletionStage getDataObjectAsync(GetDataObjectRe */ public CompletionStage getDataObjectAsync(GetDataObjectRequest request) { validateTenantId(request.tenantId()); - return getDataObjectAsync(request, ForkJoinPool.commonPool()); + return getDataObjectAsync(request, defaultExecutor); } /** @@ -119,13 +131,13 @@ public CompletionStage updateDataObjectAsync(UpdateDat } /** - * Update a data object/document in a table/index. + * Update a data object/document in a table/index using the default executor. * * @param request A request identifying the data object to update * @return A completion stage encapsulating the response or exception */ public CompletionStage updateDataObjectAsync(UpdateDataObjectRequest request) { - return updateDataObjectAsync(request, ForkJoinPool.commonPool()); + return updateDataObjectAsync(request, defaultExecutor); } /** @@ -154,13 +166,13 @@ public CompletionStage deleteDataObjectAsync(DeleteDat } /** - * Delete a data object/document from a table/index. + * Delete a data object/document from a table/index using the default executor. * * @param request A request identifying the data object to delete * @return A completion stage encapsulating the response or exception */ public CompletionStage deleteDataObjectAsync(DeleteDataObjectRequest request) { - return deleteDataObjectAsync(request, ForkJoinPool.commonPool()); + return deleteDataObjectAsync(request, defaultExecutor); } /** @@ -189,13 +201,13 @@ public CompletionStage bulkDataObjectAsync(BulkDataObjec } /** - * Perform a bulk request for multiple data objects/documents in potentially multiple tables/indices. + * Perform a bulk request for multiple data objects/documents in potentially multiple tables/indices using the default executor. * * @param request A request identifying the bulk requests to execute * @return A completion stage encapsulating the response or exception */ public CompletionStage bulkDataObjectAsync(BulkDataObjectRequest request) { - return bulkDataObjectAsync(request, ForkJoinPool.commonPool()); + return bulkDataObjectAsync(request, defaultExecutor); } /** @@ -225,13 +237,13 @@ public CompletionStage searchDataObjectAsync(SearchDat } /** - * Search for data objects/documents in a table/index. + * Search for data objects/documents in a table/index using the default executor. * * @param request A request identifying the data objects to search for * @return A completion stage encapsulating the response or exception */ public CompletionStage searchDataObjectAsync(SearchDataObjectRequest request) { - return searchDataObjectAsync(request, ForkJoinPool.commonPool()); + return searchDataObjectAsync(request, defaultExecutor); } /** diff --git a/core/src/main/java/org/opensearch/remote/metadata/client/impl/SdkClientFactory.java b/core/src/main/java/org/opensearch/remote/metadata/client/impl/SdkClientFactory.java index e4f3a97..8052bbc 100644 --- a/core/src/main/java/org/opensearch/remote/metadata/client/impl/SdkClientFactory.java +++ b/core/src/main/java/org/opensearch/remote/metadata/client/impl/SdkClientFactory.java @@ -19,6 +19,8 @@ import java.util.Iterator; import java.util.Map; import java.util.ServiceLoader; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_TYPE_KEY; import static org.opensearch.remote.metadata.common.CommonValue.TENANT_AWARE_KEY; @@ -34,9 +36,26 @@ public class SdkClientFactory { * @param client The OpenSearch node client used as the default implementation * @param xContentRegistry The OpenSearch XContentRegistry * @param metadataSettings A map defining the remote metadata type and configuration - * @return An instance of SdkClient which delegates to an implementation based on Remote Metadata Type + * @return An instance of SdkClient which delegates to an implementation based on Remote Metadata Type. The {@link ForkJoinPool#commonPool()} will be used by default for async execution. */ public static SdkClient createSdkClient(Client client, NamedXContentRegistry xContentRegistry, Map metadataSettings) { + return createSdkClient(client, xContentRegistry, metadataSettings, ForkJoinPool.commonPool()); + } + + /** + * Create a new SdkClient with implementation determined by the value of the Remote Metadata Type setting + * @param client The OpenSearch node client used as the default implementation + * @param xContentRegistry The OpenSearch XContentRegistry + * @param metadataSettings A map defining the remote metadata type and configuration + * @param defaultExecutor The default executor to use if another one is not specified + * @return An instance of SdkClient which delegates to an implementation based on Remote Metadata Type + */ + public static SdkClient createSdkClient( + Client client, + NamedXContentRegistry xContentRegistry, + Map metadataSettings, + Executor defaultExecutor + ) { String remoteMetadataType = metadataSettings.get(REMOTE_METADATA_TYPE_KEY); Boolean multiTenancy = Boolean.parseBoolean(metadataSettings.get(TENANT_AWARE_KEY)); @@ -46,7 +65,7 @@ public static SdkClient createSdkClient(Client client, NamedXContentRegistry xCo if (Strings.isNullOrEmpty(remoteMetadataType)) { // Default client does not use SPI log.info("Using local opensearch cluster as metadata store.", remoteMetadataType); - return createDefaultClient(client, xContentRegistry, metadataSettings, multiTenancy); + return createDefaultClient(client, xContentRegistry, metadataSettings, defaultExecutor, multiTenancy); } else { // Use SPI to find the correct client while (iterator.hasNext()) { @@ -54,24 +73,25 @@ public static SdkClient createSdkClient(Client client, NamedXContentRegistry xCo if (delegate.supportsMetadataType(remoteMetadataType)) { log.info("Using {} as metadata store.", remoteMetadataType); delegate.initialize(metadataSettings); - return new SdkClient(delegate, multiTenancy); + return new SdkClient(delegate, defaultExecutor, multiTenancy); } } } // If no suitable implementation is found, use the default log.warn("Unable to find {} client implementation. Using local opensearch cluster as metadata store.", remoteMetadataType); - return createDefaultClient(client, xContentRegistry, metadataSettings, multiTenancy); + return createDefaultClient(client, xContentRegistry, metadataSettings, defaultExecutor, multiTenancy); } private static SdkClient createDefaultClient( Client client, NamedXContentRegistry xContentRegistry, Map metadataSettings, + Executor defaultExecutor, Boolean multiTenancy ) { LocalClusterIndicesClient defaultclient = new LocalClusterIndicesClient(client, xContentRegistry, metadataSettings); - return new SdkClient(defaultclient, multiTenancy); + return new SdkClient(defaultclient, defaultExecutor, multiTenancy); } // Package private for testing diff --git a/core/src/test/java/org/opensearch/remote/metadata/client/SdkClientTests.java b/core/src/test/java/org/opensearch/remote/metadata/client/SdkClientTests.java index bc96cb5..9efc25a 100644 --- a/core/src/test/java/org/opensearch/remote/metadata/client/SdkClientTests.java +++ b/core/src/test/java/org/opensearch/remote/metadata/client/SdkClientTests.java @@ -21,11 +21,14 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import static org.junit.Assert.assertSame; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -392,4 +395,27 @@ public void testExecutePrivilegedAsyncWithException() throws Exception { assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS)); assertTrue(future.isCompletedExceptionally()); } + + @Test + public void testDefaultExecutor() { + SdkClient sdkClient = new SdkClient(sdkClientImpl, false); + ArgumentCaptor executorCaptor = ArgumentCaptor.forClass(Executor.class); + when(sdkClientImpl.getDataObjectAsync(any(GetDataObjectRequest.class), executorCaptor.capture(), anyBoolean())) + .thenCallRealMethod(); + sdkClient.getDataObjectAsync(getRequest); + verify(sdkClientImpl).getDataObjectAsync(any(GetDataObjectRequest.class), any(Executor.class), anyBoolean()); + assertSame(ForkJoinPool.commonPool(), executorCaptor.getValue()); + } + + @Test + public void testCustomExecutor() { + Executor customExecutor = mock(Executor.class); + SdkClient sdkClient = new SdkClient(sdkClientImpl, customExecutor, false); + ArgumentCaptor executorCaptor = ArgumentCaptor.forClass(Executor.class); + when(sdkClientImpl.getDataObjectAsync(any(GetDataObjectRequest.class), executorCaptor.capture(), anyBoolean())) + .thenCallRealMethod(); + sdkClient.getDataObjectAsync(getRequest); + verify(sdkClientImpl).getDataObjectAsync(any(GetDataObjectRequest.class), any(Executor.class), anyBoolean()); + assertSame(customExecutor, executorCaptor.getValue()); + } }