Skip to content

Commit

Permalink
Allow specifying a default Executor when instantiating SdkClient (#47) (
Browse files Browse the repository at this point in the history
#48)

(cherry picked from commit 98c1c31)

Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent eb9271b commit 48de32a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,27 @@
public class SdkClient {

private final SdkClientDelegate delegate;
private final Executor defaultExecutor;
private final Boolean isMultiTenancyEnabled;

/**
* Instantiate this client
* Instantiate this client with the {@link ForkJoinPool#commonPool()} as the default executor
* @param delegate The client implementation to delegate calls to
* @param multiTenancy whether multiTenancy is enabled
*/
public SdkClient(SdkClientDelegate delegate, Boolean multiTenancy) {
this(delegate, ForkJoinPool.commonPool(), multiTenancy);
}

/**
* Instantiate this client with a default Executor
* @param delegate The client implementation to delegate calls to
* @param defaultExecutor A default executor to use for asynchronous execution unless otherwise specified
* @param multiTenancy whether multiTenancy is enabled
*/
public SdkClient(SdkClientDelegate delegate, Executor defaultExecutor, Boolean multiTenancy) {
this.delegate = delegate;
this.defaultExecutor = defaultExecutor;
this.isMultiTenancyEnabled = multiTenancy;
}

Expand All @@ -50,12 +62,12 @@ public CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectRe
}

/**
* Create/Put/Index a data object/document into a table/index.
* Create/Put/Index a data object/document into a table/index using the default executor.
* @param request A request encapsulating the data object to store
* @return A completion stage encapsulating the response or exception
*/
public CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectRequest request) {
return putDataObjectAsync(request, ForkJoinPool.commonPool());
return putDataObjectAsync(request, defaultExecutor);
}

/**
Expand All @@ -72,7 +84,7 @@ public PutDataObjectResponse putDataObject(PutDataObjectRequest request) {
}

/**
* Read/Get a data object/document from a table/index.
* Read/Get a data object/document from a table/index using the default executor.
*
* @param request A request identifying the data object to retrieve
* @param executor the executor to use for asynchronous execution
Expand All @@ -90,7 +102,7 @@ public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRe
*/
public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRequest request) {
validateTenantId(request.tenantId());
return getDataObjectAsync(request, ForkJoinPool.commonPool());
return getDataObjectAsync(request, defaultExecutor);
}

/**
Expand Down Expand Up @@ -119,13 +131,13 @@ public CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDat
}

/**
* Update a data object/document in a table/index.
* Update a data object/document in a table/index using the default executor.
*
* @param request A request identifying the data object to update
* @return A completion stage encapsulating the response or exception
*/
public CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDataObjectRequest request) {
return updateDataObjectAsync(request, ForkJoinPool.commonPool());
return updateDataObjectAsync(request, defaultExecutor);
}

/**
Expand Down Expand Up @@ -154,13 +166,13 @@ public CompletionStage<DeleteDataObjectResponse> deleteDataObjectAsync(DeleteDat
}

/**
* Delete a data object/document from a table/index.
* Delete a data object/document from a table/index using the default executor.
*
* @param request A request identifying the data object to delete
* @return A completion stage encapsulating the response or exception
*/
public CompletionStage<DeleteDataObjectResponse> deleteDataObjectAsync(DeleteDataObjectRequest request) {
return deleteDataObjectAsync(request, ForkJoinPool.commonPool());
return deleteDataObjectAsync(request, defaultExecutor);
}

/**
Expand Down Expand Up @@ -189,13 +201,13 @@ public CompletionStage<BulkDataObjectResponse> bulkDataObjectAsync(BulkDataObjec
}

/**
* Perform a bulk request for multiple data objects/documents in potentially multiple tables/indices.
* Perform a bulk request for multiple data objects/documents in potentially multiple tables/indices using the default executor.
*
* @param request A request identifying the bulk requests to execute
* @return A completion stage encapsulating the response or exception
*/
public CompletionStage<BulkDataObjectResponse> bulkDataObjectAsync(BulkDataObjectRequest request) {
return bulkDataObjectAsync(request, ForkJoinPool.commonPool());
return bulkDataObjectAsync(request, defaultExecutor);
}

/**
Expand Down Expand Up @@ -225,13 +237,13 @@ public CompletionStage<SearchDataObjectResponse> searchDataObjectAsync(SearchDat
}

/**
* Search for data objects/documents in a table/index.
* Search for data objects/documents in a table/index using the default executor.
*
* @param request A request identifying the data objects to search for
* @return A completion stage encapsulating the response or exception
*/
public CompletionStage<SearchDataObjectResponse> searchDataObjectAsync(SearchDataObjectRequest request) {
return searchDataObjectAsync(request, ForkJoinPool.commonPool());
return searchDataObjectAsync(request, defaultExecutor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.Iterator;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;

import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_TYPE_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_AWARE_KEY;
Expand All @@ -34,9 +36,26 @@ public class SdkClientFactory {
* @param client The OpenSearch node client used as the default implementation
* @param xContentRegistry The OpenSearch XContentRegistry
* @param metadataSettings A map defining the remote metadata type and configuration
* @return An instance of SdkClient which delegates to an implementation based on Remote Metadata Type
* @return An instance of SdkClient which delegates to an implementation based on Remote Metadata Type. The {@link ForkJoinPool#commonPool()} will be used by default for async execution.
*/
public static SdkClient createSdkClient(Client client, NamedXContentRegistry xContentRegistry, Map<String, String> metadataSettings) {
return createSdkClient(client, xContentRegistry, metadataSettings, ForkJoinPool.commonPool());
}

/**
* Create a new SdkClient with implementation determined by the value of the Remote Metadata Type setting
* @param client The OpenSearch node client used as the default implementation
* @param xContentRegistry The OpenSearch XContentRegistry
* @param metadataSettings A map defining the remote metadata type and configuration
* @param defaultExecutor The default executor to use if another one is not specified
* @return An instance of SdkClient which delegates to an implementation based on Remote Metadata Type
*/
public static SdkClient createSdkClient(
Client client,
NamedXContentRegistry xContentRegistry,
Map<String, String> metadataSettings,
Executor defaultExecutor
) {
String remoteMetadataType = metadataSettings.get(REMOTE_METADATA_TYPE_KEY);
Boolean multiTenancy = Boolean.parseBoolean(metadataSettings.get(TENANT_AWARE_KEY));

Expand All @@ -46,32 +65,33 @@ public static SdkClient createSdkClient(Client client, NamedXContentRegistry xCo
if (Strings.isNullOrEmpty(remoteMetadataType)) {
// Default client does not use SPI
log.info("Using local opensearch cluster as metadata store.", remoteMetadataType);
return createDefaultClient(client, xContentRegistry, metadataSettings, multiTenancy);
return createDefaultClient(client, xContentRegistry, metadataSettings, defaultExecutor, multiTenancy);
} else {
// Use SPI to find the correct client
while (iterator.hasNext()) {
SdkClientDelegate delegate = iterator.next();
if (delegate.supportsMetadataType(remoteMetadataType)) {
log.info("Using {} as metadata store.", remoteMetadataType);
delegate.initialize(metadataSettings);
return new SdkClient(delegate, multiTenancy);
return new SdkClient(delegate, defaultExecutor, multiTenancy);
}
}
}

// If no suitable implementation is found, use the default
log.warn("Unable to find {} client implementation. Using local opensearch cluster as metadata store.", remoteMetadataType);
return createDefaultClient(client, xContentRegistry, metadataSettings, multiTenancy);
return createDefaultClient(client, xContentRegistry, metadataSettings, defaultExecutor, multiTenancy);
}

private static SdkClient createDefaultClient(
Client client,
NamedXContentRegistry xContentRegistry,
Map<String, String> metadataSettings,
Executor defaultExecutor,
Boolean multiTenancy
) {
LocalClusterIndicesClient defaultclient = new LocalClusterIndicesClient(client, xContentRegistry, metadataSettings);
return new SdkClient(defaultclient, multiTenancy);
return new SdkClient(defaultclient, defaultExecutor, multiTenancy);
}

// Package private for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.junit.Assert.assertSame;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -392,4 +395,27 @@ public void testExecutePrivilegedAsyncWithException() throws Exception {
assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS));
assertTrue(future.isCompletedExceptionally());
}

@Test
public void testDefaultExecutor() {
SdkClient sdkClient = new SdkClient(sdkClientImpl, false);
ArgumentCaptor<Executor> executorCaptor = ArgumentCaptor.forClass(Executor.class);
when(sdkClientImpl.getDataObjectAsync(any(GetDataObjectRequest.class), executorCaptor.capture(), anyBoolean()))
.thenCallRealMethod();
sdkClient.getDataObjectAsync(getRequest);
verify(sdkClientImpl).getDataObjectAsync(any(GetDataObjectRequest.class), any(Executor.class), anyBoolean());
assertSame(ForkJoinPool.commonPool(), executorCaptor.getValue());
}

@Test
public void testCustomExecutor() {
Executor customExecutor = mock(Executor.class);
SdkClient sdkClient = new SdkClient(sdkClientImpl, customExecutor, false);
ArgumentCaptor<Executor> executorCaptor = ArgumentCaptor.forClass(Executor.class);
when(sdkClientImpl.getDataObjectAsync(any(GetDataObjectRequest.class), executorCaptor.capture(), anyBoolean()))
.thenCallRealMethod();
sdkClient.getDataObjectAsync(getRequest);
verify(sdkClientImpl).getDataObjectAsync(any(GetDataObjectRequest.class), any(Executor.class), anyBoolean());
assertSame(customExecutor, executorCaptor.getValue());
}
}

0 comments on commit 48de32a

Please sign in to comment.