Skip to content

Commit

Permalink
Refactoring ResourceClusterActor class (#583)
Browse files Browse the repository at this point in the history
* Adding some more precondition to check what's going on

* Some minor refactoring and added some additional debugging in ResourceCluster classes

1. Get rid of class ResourceClusterTaskExecutorMapper as it was not being used
2. Logging all request/response pairs from ResourceClusterRoute to understand the issues going on

* Some more debug logs

* Fixing compilation

* Enable parallel builds in mantis

* Adding more debug logs

---------

Co-authored-by: Sundaram Ananthanarayanan <[email protected]>
  • Loading branch information
sundargates and sundargates authored Nov 14, 2023
1 parent 3f575ed commit 81bce54
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 83 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
org.gradle.parallel=false
org.gradle.parallel=true
org.gradle.caching=false

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@

import static akka.http.javadsl.server.PathMatchers.segment;

import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.server.PathMatcher0;
import akka.http.javadsl.server.PathMatchers;
import akka.http.javadsl.server.Rejection;
import akka.http.javadsl.server.Route;
import akka.http.javadsl.server.directives.LogEntry;
import io.mantisrx.master.api.akka.route.Jackson;
import io.mantisrx.server.master.resourcecluster.ClusterID;
import io.mantisrx.server.master.resourcecluster.ResourceClusters;
import io.mantisrx.server.master.resourcecluster.TaskExecutorDisconnection;
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange;
import java.util.List;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -42,16 +48,26 @@
@Slf4j
@RequiredArgsConstructor
public class ResourceClustersLeaderExclusiveRoute extends BaseRoute {

private static final PathMatcher0 RESOURCECLUSTERS_API_PREFIX =
segment("api").slash("v1").slash("resourceClusters");

private final ResourceClusters gateway;

private Optional<LogEntry> onRequestCompletion(HttpRequest request, HttpResponse response) {
log.debug("ResourceClustersLeaderExclusiveRoute: {} {}", request, response);
return Optional.empty();
}

private Optional<LogEntry> onRequestRejection(HttpRequest request, List<Rejection> rejections) {
return Optional.empty();
}

@Override
protected Route constructRoutes() {
return pathPrefix(
RESOURCECLUSTERS_API_PREFIX,
() -> concat(
() -> logRequestResultOptional(this::onRequestCompletion, this::onRequestRejection, () -> concat(
// /{}/actions/registerTaskExecutor
path(
PathMatchers.segment().slash("actions").slash("registerTaskExecutor"),
Expand Down Expand Up @@ -87,7 +103,7 @@ protected Route constructRoutes() {
post(() -> disconnectTaskExecutor(getClusterID(clusterName)))
))
)
));
)));
}

private Route registerTaskExecutor(ClusterID clusterID) {
Expand Down Expand Up @@ -127,7 +143,8 @@ private Route notifyTaskExecutorStatusChange(ClusterID clusterID) {
clusterID.getResourceID(),
request);

return withFuture(gateway.getClusterFor(clusterID).notifyTaskExecutorStatusChange(request));
return withFuture(
gateway.getClusterFor(clusterID).notifyTaskExecutorStatusChange(request));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,27 +546,33 @@ private boolean isTaskExecutorDisabled(TaskExecutorRegistration registration) {
}

private void onHeartbeat(TaskExecutorHeartbeat heartbeat) {
log.debug("Received heartbeat {} from task executor {}", heartbeat, heartbeat.getTaskExecutorID());
setupTaskExecutorStateIfNecessary(heartbeat.getTaskExecutorID());
try {
final TaskExecutorID taskExecutorID = heartbeat.getTaskExecutorID();
final TaskExecutorState state = this.executorStateManager.get(taskExecutorID);
if (state.getRegistration() == null || !state.isRegistered()) {
TaskExecutorRegistration registration = this.mantisJobStore.getTaskExecutor(heartbeat.getTaskExecutorID());
if (registration != null) {
state.onRegistration(registration);
log.debug("Found registration {} for task executor {}", registration, heartbeat.getTaskExecutorID());
Preconditions.checkState(state.onRegistration(registration));
} else {
// TODO(sundaram): add a metric
log.warn("Received heartbeat from unknown task executor {}", heartbeat.getTaskExecutorID());
sender().tell(new Status.Failure(new TaskExecutorNotFoundException(taskExecutorID)), self());
return;
}
} else {
log.debug("Found registration {} for registered task executor {}",
state.getRegistration(), heartbeat.getTaskExecutorID());
}
boolean stateChange = state.onHeartbeat(heartbeat);
if (stateChange && state.isAvailable()) {
this.executorStateManager.tryMarkAvailable(taskExecutorID);
}

updateHeartbeatTimeout(heartbeat.getTaskExecutorID());
log.debug("Successfully processed heartbeat {} from task executor {}", heartbeat, heartbeat.getTaskExecutorID());
sender().tell(Ack.getInstance(), self());
} catch (Exception e) {
sender().tell(new Status.Failure(e), self());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import io.mantisrx.server.master.resourcecluster.ContainerSkuID;
import io.mantisrx.server.master.resourcecluster.PagedActiveJobOverview;
import io.mantisrx.server.master.resourcecluster.ResourceCluster;
import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper;
import io.mantisrx.server.master.resourcecluster.TaskExecutorAllocationRequest;
import io.mantisrx.server.master.resourcecluster.TaskExecutorID;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
Expand All @@ -62,17 +61,14 @@
class ResourceClusterAkkaImpl extends ResourceClusterGatewayAkkaImpl implements ResourceCluster {

private final ClusterID clusterID;
private final ResourceClusterTaskExecutorMapper mapper;

public ResourceClusterAkkaImpl(
ActorRef resourceClusterManagerActor,
Duration askTimeout,
ClusterID clusterID,
ResourceClusterTaskExecutorMapper mapper,
Supplier<Integer> rateLimitPerSecond) {
super(resourceClusterManagerActor, askTimeout, mapper, rateLimitPerSecond);
super(resourceClusterManagerActor, askTimeout, rateLimitPerSecond);
this.clusterID = clusterID;
this.mapper = mapper;
}

@Override
Expand All @@ -87,9 +83,7 @@ public CompletableFuture<Ack> initializeTaskExecutor(TaskExecutorID taskExecutor
new InitializeTaskExecutorRequest(taskExecutorID, workerId),
askTimeout)
.thenApply(Ack.class::cast)
.toCompletableFuture()
.whenComplete((ack, dontCare) ->
mapper.onTaskExecutorDiscovered(clusterID, taskExecutorID));
.toCompletableFuture();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.server.master.resourcecluster.RequestThrottledException;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway;
import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper;
import io.mantisrx.server.master.resourcecluster.TaskExecutorDisconnection;
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
Expand All @@ -44,7 +43,6 @@
class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway {
protected final ActorRef resourceClusterManagerActor;
protected final Duration askTimeout;
private final ResourceClusterTaskExecutorMapper mapper;
private final Counter registrationCounter;
private final Counter heartbeatCounter;
private final Counter disconnectionCounter;
Expand All @@ -58,11 +56,9 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway {
ResourceClusterGatewayAkkaImpl(
ActorRef resourceClusterManagerActor,
Duration askTimeout,
ResourceClusterTaskExecutorMapper mapper,
Supplier<Integer> maxConcurrentRequestCount) {
Supplier<Integer> maxConcurrentRequestCount) {
this.resourceClusterManagerActor = resourceClusterManagerActor;
this.askTimeout = askTimeout;
this.mapper = mapper;

log.info("Setting maxConcurrentRequestCount for resourceCluster gateway {}", maxConcurrentRequestCount);
this.rateLimiter = RateLimiter.create(maxConcurrentRequestCount.get());
Expand Down Expand Up @@ -109,11 +105,7 @@ private CompletableFuture<Ack> registerTaskExecutorImpl(TaskExecutorRegistration
return Patterns
.ask(resourceClusterManagerActor, registration, askTimeout)
.thenApply(Ack.class::cast)
.toCompletableFuture()
.whenComplete((dontCare, throwable) ->
mapper.onTaskExecutorDiscovered(
registration.getClusterID(),
registration.getTaskExecutorID()));
.toCompletableFuture();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.mantisrx.server.master.persistence.MantisJobStore;
import io.mantisrx.server.master.resourcecluster.ClusterID;
import io.mantisrx.server.master.resourcecluster.ResourceCluster;
import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper;
import io.mantisrx.server.master.resourcecluster.ResourceClusters;
import io.mantisrx.server.master.scheduler.JobMessageRouter;
import java.time.Clock;
Expand All @@ -48,7 +47,6 @@ public class ResourceClustersAkkaImpl implements ResourceClusters {

private final ActorRef resourceClustersManagerActor;
private final Duration askTimeout;
private final ResourceClusterTaskExecutorMapper mapper;
private final Supplier<Integer> rateLimitPerSecond;
private final ConcurrentMap<ClusterID, ResourceCluster> cache =
new ConcurrentHashMap<>();
Expand All @@ -62,8 +60,7 @@ public ResourceCluster getClusterFor(ClusterID clusterID) {
resourceClustersManagerActor,
askTimeout,
clusterID,
mapper,
rateLimitPerSecond));
rateLimitPerSecond));
return cache.get(clusterID);
}

Expand All @@ -90,12 +87,10 @@ public static ResourceClusters load(
ResourceClustersManagerActor.props(masterConfiguration.getConfig(), Clock.systemDefaultZone(),
rpcService, mantisJobStore, resourceClusterHostActorRef, persistenceProvider,
jobMessageRouter));
final ResourceClusterTaskExecutorMapper globalMapper =
ResourceClusterTaskExecutorMapper.inMemory();

final Duration askTimeout = java.time.Duration.ofMillis(
ConfigurationProvider.getConfig().getMasterApiAskTimeoutMs());
final Supplier<Integer> rateLimitPerSecond = () -> masterConfiguration.getConfig().getResourceClusterActionsPermitsPerSecond();
return new ResourceClustersAkkaImpl(resourceClusterManagerActor, askTimeout, globalMapper, rateLimitPerSecond);
return new ResourceClustersAkkaImpl(resourceClusterManagerActor, askTimeout, rateLimitPerSecond);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.mantisrx.server.master.resourcecluster.ResourceCluster.ResourceOverview;
import io.mantisrx.server.master.resourcecluster.ResourceCluster.TaskExecutorNotFoundException;
import io.mantisrx.server.master.resourcecluster.ResourceCluster.TaskExecutorStatus;
import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper;
import io.mantisrx.server.master.resourcecluster.TaskExecutorAllocationRequest;
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
import io.mantisrx.server.master.resourcecluster.TaskExecutorID;
Expand Down Expand Up @@ -155,7 +154,6 @@ public class ResourceClusterActorTest {
private final TaskExecutorGateway gateway = mock(TaskExecutorGateway.class);

private MantisJobStore mantisJobStore;
private ResourceClusterTaskExecutorMapper mapper;
private ActorRef resourceClusterActor;
private ResourceCluster resourceCluster;
private JobMessageRouter jobMessageRouter;
Expand All @@ -175,7 +173,6 @@ public static void teardown() {
public void setupRpcService() {
rpcService.registerGateway(TASK_EXECUTOR_ADDRESS, gateway);
mantisJobStore = mock(MantisJobStore.class);
mapper = ResourceClusterTaskExecutorMapper.inMemory();
jobMessageRouter = mock(JobMessageRouter.class);
}

Expand All @@ -201,7 +198,6 @@ public void setupActor() {
resourceClusterActor,
Duration.ofSeconds(1),
CLUSTER_ID,
mapper,
() -> 10000);
}

Expand Down

0 comments on commit 81bce54

Please sign in to comment.