Skip to content

Commit

Permalink
Dynamic Rate Limiter (#574)
Browse files Browse the repository at this point in the history
Co-authored-by: Sundaram Ananthanarayanan <[email protected]>
  • Loading branch information
sundargates and sundargates authored Nov 1, 2023
1 parent 6cd3b66 commit 035b3e0
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

class ResourceClusterAkkaImpl extends ResourceClusterGatewayAkkaImpl implements ResourceCluster {

Expand All @@ -68,7 +69,7 @@ public ResourceClusterAkkaImpl(
Duration askTimeout,
ClusterID clusterID,
ResourceClusterTaskExecutorMapper mapper,
int rateLimitPerSecond) {
Supplier<Integer> rateLimitPerSecond) {
super(resourceClusterManagerActor, askTimeout, mapper, rateLimitPerSecond);
this.clusterID = clusterID;
this.mapper = mapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange;
import io.mantisrx.shaded.com.google.common.util.concurrent.RateLimiter;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -48,11 +49,8 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway {
private final Counter heartbeatCounter;
private final Counter disconnectionCounter;
private final Counter throttledCounter;
private final RateLimiter rateLimiter;

/**
* Throttle requests to a single ResourceCluster actor to avoid piled up mailbox.
*/
private final Semaphore semaphore;

// todo: cleanup scheduler on service shutdown.
private final ScheduledExecutorService semaphoreResetScheduler = Executors.newScheduledThreadPool(1);
Expand All @@ -61,17 +59,18 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway {
ActorRef resourceClusterManagerActor,
Duration askTimeout,
ResourceClusterTaskExecutorMapper mapper,
int maxConcurrentRequestCount) {
Supplier<Integer> maxConcurrentRequestCount) {
this.resourceClusterManagerActor = resourceClusterManagerActor;
this.askTimeout = askTimeout;
this.mapper = mapper;

log.info("Setting maxConcurrentRequestCount for resourceCluster gateway {}", maxConcurrentRequestCount);
this.semaphore = new Semaphore(maxConcurrentRequestCount);
this.rateLimiter = RateLimiter.create(maxConcurrentRequestCount.get());
semaphoreResetScheduler.scheduleAtFixedRate(() -> {
semaphore.drainPermits();
semaphore.release(maxConcurrentRequestCount);
}, 1, 1, TimeUnit.SECONDS);
int newRate = maxConcurrentRequestCount.get();
log.info("Setting the rate limiter rate to {}", newRate);
rateLimiter.setRate(newRate);
}, 1, 1, TimeUnit.MINUTES);

Metrics m = new Metrics.Builder()
.id("ResourceClusterGatewayAkkaImpl")
Expand All @@ -89,7 +88,7 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway {

private <In, Out> Function<In, CompletableFuture<Out>> withThrottle(Function<In, CompletableFuture<Out>> func) {
return in -> {
if (semaphore.tryAcquire()) {
if (rateLimiter.tryAcquire()) {
return func.apply(in);
} else {
this.throttledCounter.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import io.mantisrx.server.master.config.ConfigurationFactory;
import io.mantisrx.server.master.config.ConfigurationProvider;
import io.mantisrx.server.master.config.MasterConfiguration;
import io.mantisrx.server.master.persistence.IMantisPersistenceProvider;
import io.mantisrx.server.master.persistence.MantisJobStore;
import io.mantisrx.server.master.resourcecluster.ClusterID;
Expand All @@ -34,6 +34,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import org.apache.flink.runtime.rpc.RpcService;
Expand All @@ -48,7 +49,7 @@ public class ResourceClustersAkkaImpl implements ResourceClusters {
private final ActorRef resourceClustersManagerActor;
private final Duration askTimeout;
private final ResourceClusterTaskExecutorMapper mapper;
private final int rateLimitPerSecond;
private final Supplier<Integer> rateLimitPerSecond;
private final ConcurrentMap<ClusterID, ResourceCluster> cache =
new ConcurrentHashMap<>();

Expand Down Expand Up @@ -77,7 +78,7 @@ public CompletableFuture<Set<ClusterID>> listActiveClusters() {
}

public static ResourceClusters load(
MasterConfiguration masterConfiguration,
ConfigurationFactory masterConfiguration,
RpcService rpcService,
ActorSystem actorSystem,
MantisJobStore mantisJobStore,
Expand All @@ -86,15 +87,15 @@ public static ResourceClusters load(
IMantisPersistenceProvider persistenceProvider) {
final ActorRef resourceClusterManagerActor =
actorSystem.actorOf(
ResourceClustersManagerActor.props(masterConfiguration, Clock.systemDefaultZone(),
ResourceClustersManagerActor.props(masterConfiguration.getConfig(), Clock.systemDefaultZone(),
rpcService, mantisJobStore, resourceClusterHostActorRef, persistenceProvider,
jobMessageRouter));
final ResourceClusterTaskExecutorMapper globalMapper =
ResourceClusterTaskExecutorMapper.inMemory();

final Duration askTimeout = java.time.Duration.ofMillis(
ConfigurationProvider.getConfig().getMasterApiAskTimeoutMs());
final int rateLimitPerSecond = masterConfiguration.getResourceClusterActionsPermitsPerSecond();
final Supplier<Integer> rateLimitPerSecond = () -> masterConfiguration.getConfig().getResourceClusterActionsPermitsPerSecond();
return new ResourceClustersAkkaImpl(resourceClusterManagerActor, askTimeout, globalMapper, rateLimitPerSecond);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public MasterMain(ConfigurationFactory configFactory, AuditEventSubscriber audit
RpcUtils.createRemoteRpcService(rpcSystem, configuration, null, "6123", null, Optional.empty());
final ResourceClusters resourceClusters =
ResourceClustersAkkaImpl.load(
getConfig(),
configFactory,
rpcService,
system,
mantisJobStore,
Expand Down

0 comments on commit 035b3e0

Please sign in to comment.