Skip to content

Commit

Permalink
Add concurrent requests
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Mar 6, 2024
1 parent 1f2503d commit b4b8279
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class ConnectionConfiguration {
public static final String NETWORK_POLICY_NAME = "network_policy_name";
public static final String VPCE_ID = "vpce_id";
public static final String REQUEST_COMPRESSION_ENABLED = "enable_request_compression";
public static final String CLIENTS = "clients";

/**
* The valid port range per https://tools.ietf.org/html/rfc6335.
Expand All @@ -117,6 +118,7 @@ public class ConnectionConfiguration {
private final String serverlessCollectionName;
private final String serverlessVpceId;
private final boolean requestCompressionEnabled;
private final Integer clients;

List<String> getHosts() {
return hosts;
Expand Down Expand Up @@ -178,6 +180,10 @@ boolean isRequestCompressionEnabled() {
return requestCompressionEnabled;
}

Integer getClients() {
return clients;
}

private ConnectionConfiguration(final Builder builder) {
this.hosts = builder.hosts;
this.username = builder.username;
Expand All @@ -198,6 +204,7 @@ private ConnectionConfiguration(final Builder builder) {
this.serverlessVpceId = builder.serverlessVpceId;
this.requestCompressionEnabled = builder.requestCompressionEnabled;
this.pipelineName = builder.pipelineName;
this.clients = builder.clients;
}

public static ConnectionConfiguration readConnectionConfiguration(final PluginSetting pluginSetting){
Expand Down Expand Up @@ -275,6 +282,9 @@ public static ConnectionConfiguration readConnectionConfiguration(final PluginSe
REQUEST_COMPRESSION_ENABLED, !DistributionVersion.ES6.equals(distributionVersion));
builder = builder.withRequestCompressionEnabled(requestCompressionEnabled);

final Integer clients = pluginSetting.getIntegerOrDefault(CLIENTS, 1);
builder = builder.withClients(clients);

return builder.build();
}

Expand Down Expand Up @@ -508,6 +518,7 @@ public static class Builder {
private String serverlessCollectionName;
private String serverlessVpceId;
private boolean requestCompressionEnabled;
private Integer clients;

private void validateStsRoleArn(final String awsStsRoleArn) {
final Arn arn = getArn(awsStsRoleArn);
Expand Down Expand Up @@ -637,6 +648,11 @@ public Builder withRequestCompressionEnabled(final boolean requestCompressionEna
return this;
}

public Builder withClients(final Integer clients) {
this.clients = clients;
return this;
}

public ConnectionConfiguration build() {
return new ConnectionConfiguration(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -139,6 +144,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private DlqProvider dlqProvider;
private final ConcurrentHashMap<Long, AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>> bulkRequestMap;
private final ConcurrentHashMap<Long, Long> lastFlushTimeMap;
private final RequestSender requestSender;

@DataPrepperPluginConstructor
public OpenSearchSink(final PluginSetting pluginSetting,
Expand Down Expand Up @@ -185,6 +191,8 @@ public OpenSearchSink(final PluginSetting pluginSetting,
dlqPluginSetting.setPipelineName(pluginSetting.getPipelineName());
dlqProvider = pluginFactory.loadPlugin(DlqProvider.class, dlqPluginSetting);
}

this.requestSender = new RequestSender(openSearchSinkConfig.getConnectionConfiguration().getClients());
}

@Override
Expand Down Expand Up @@ -491,6 +499,10 @@ SerializedJson getDocument(final Event event) {
}

private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) {
requestSender.sendRequest(() -> doFlushBatch(accumulatingBulkRequest));
}

private Void doFlushBatch(AccumulatingBulkRequest accumulatingBulkRequest) {
bulkRequestTimer.record(() -> {
try {
LOG.debug("Sending data to OpenSearch");
Expand All @@ -502,6 +514,8 @@ private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) {
Thread.currentThread().interrupt();
}
});

return null;
}

private void logFailureForBulkRequests(final List<FailedBulkOperation> failedBulkOperations, final Throwable failure) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.opensearch.dataprepper.plugins.sink.opensearch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantLock;

public class RequestSender {
private static final Logger LOG = LoggerFactory.getLogger(RequestSender.class);

private final List<Future<Void>> pendingRequestFutures;
private final ExecutorService requestExecutor;
private final CompletionService<Void> completionService;
private final int concurrentRequestCount;
private final ReentrantLock reentrantLock;

public RequestSender(final int concurrentRequestCount) {
this.concurrentRequestCount = concurrentRequestCount;
pendingRequestFutures = new ArrayList<>();
requestExecutor = Executors.newFixedThreadPool(concurrentRequestCount);
completionService = new ExecutorCompletionService(requestExecutor);
reentrantLock = new ReentrantLock();
}

public void sendRequest(final Callable<Void> requestRunnable) {
reentrantLock.lock();

if (pendingRequestFutures.size() >= concurrentRequestCount) {
waitForRequestSlot();
}

final Future<Void> future = completionService.submit(requestRunnable);
pendingRequestFutures.add(future);

reentrantLock.unlock();
}

private void waitForRequestSlot() {
do {
checkFutureCompletion();
if (isRequestQueueFull()) {
try {
LOG.info("Request queue is full, waiting for slot to free up");
completionService.take();
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for future completion");
}
}
} while (isRequestQueueFull());
}

private boolean isRequestQueueFull() {
return pendingRequestFutures.size() >= concurrentRequestCount;
}

private void checkFutureCompletion() {
for (Iterator<Future<Void>> iterator = pendingRequestFutures.iterator(); iterator.hasNext();) {
final Future<Void> future = iterator.next();
if (future.isCancelled()) {
try {
future.get();
} catch (final Exception e) {
LOG.error("Indexing future was cancelled", e);
iterator.remove();
return;
}
}

if (future.isDone()) {
iterator.remove();
}
}
}
}

0 comments on commit b4b8279

Please sign in to comment.