diff --git a/gradle.properties b/gradle.properties index 32516245c..5d711d4d7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -org.gradle.parallel=false +org.gradle.parallel=true org.gradle.caching=false diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceClusterTaskExecutorMapper.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceClusterTaskExecutorMapper.java deleted file mode 100644 index 0e96f2de9..000000000 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceClusterTaskExecutorMapper.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2022 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.mantisrx.server.master.resourcecluster; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import javax.annotation.Nullable; - -/** - * This is a helper abstraction to find cluster ids for task executors. - * For the foreseeable future, we expect only one implementation of this particular class. - */ -public interface ResourceClusterTaskExecutorMapper { - @Nullable - ClusterID getClusterFor(TaskExecutorID taskExecutorID); - - void onTaskExecutorDiscovered(ClusterID clusterID, TaskExecutorID taskExecutorID); - - static ResourceClusterTaskExecutorMapper inMemory() { - return new ResourceClusterTaskExecutorMapper() { - private final ConcurrentMap map = - new ConcurrentHashMap<>(); - - @Override - public ClusterID getClusterFor(TaskExecutorID taskExecutorID) { - return map.get(taskExecutorID); - } - - @Override - public void onTaskExecutorDiscovered(ClusterID clusterID, TaskExecutorID taskExecutorID) { - map.putIfAbsent(taskExecutorID, clusterID); - } - }; - } -} diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersLeaderExclusiveRoute.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersLeaderExclusiveRoute.java index 22ded04e7..409517dae 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersLeaderExclusiveRoute.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/ResourceClustersLeaderExclusiveRoute.java @@ -18,9 +18,13 @@ import static akka.http.javadsl.server.PathMatchers.segment; +import akka.http.javadsl.model.HttpRequest; +import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.server.PathMatcher0; import akka.http.javadsl.server.PathMatchers; +import akka.http.javadsl.server.Rejection; import akka.http.javadsl.server.Route; +import akka.http.javadsl.server.directives.LogEntry; import io.mantisrx.master.api.akka.route.Jackson; import io.mantisrx.server.master.resourcecluster.ClusterID; import io.mantisrx.server.master.resourcecluster.ResourceClusters; @@ -28,6 +32,8 @@ import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat; import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration; import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange; +import java.util.List; +import java.util.Optional; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -42,16 +48,26 @@ @Slf4j @RequiredArgsConstructor public class ResourceClustersLeaderExclusiveRoute extends BaseRoute { + private static final PathMatcher0 RESOURCECLUSTERS_API_PREFIX = segment("api").slash("v1").slash("resourceClusters"); private final ResourceClusters gateway; + private Optional onRequestCompletion(HttpRequest request, HttpResponse response) { + log.debug("ResourceClustersLeaderExclusiveRoute: {} {}", request, response); + return Optional.empty(); + } + + private Optional onRequestRejection(HttpRequest request, List rejections) { + return Optional.empty(); + } + @Override protected Route constructRoutes() { return pathPrefix( RESOURCECLUSTERS_API_PREFIX, - () -> concat( + () -> logRequestResultOptional(this::onRequestCompletion, this::onRequestRejection, () -> concat( // /{}/actions/registerTaskExecutor path( PathMatchers.segment().slash("actions").slash("registerTaskExecutor"), @@ -87,7 +103,7 @@ protected Route constructRoutes() { post(() -> disconnectTaskExecutor(getClusterID(clusterName))) )) ) - )); + ))); } private Route registerTaskExecutor(ClusterID clusterID) { @@ -127,7 +143,8 @@ private Route notifyTaskExecutorStatusChange(ClusterID clusterID) { clusterID.getResourceID(), request); - return withFuture(gateway.getClusterFor(clusterID).notifyTaskExecutorStatusChange(request)); + return withFuture( + gateway.getClusterFor(clusterID).notifyTaskExecutorStatusChange(request)); }); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java index f3975d4cb..ff95e25a7 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java @@ -546,6 +546,7 @@ private boolean isTaskExecutorDisabled(TaskExecutorRegistration registration) { } private void onHeartbeat(TaskExecutorHeartbeat heartbeat) { + log.debug("Received heartbeat {} from task executor {}", heartbeat, heartbeat.getTaskExecutorID()); setupTaskExecutorStateIfNecessary(heartbeat.getTaskExecutorID()); try { final TaskExecutorID taskExecutorID = heartbeat.getTaskExecutorID(); @@ -553,13 +554,17 @@ private void onHeartbeat(TaskExecutorHeartbeat heartbeat) { if (state.getRegistration() == null || !state.isRegistered()) { TaskExecutorRegistration registration = this.mantisJobStore.getTaskExecutor(heartbeat.getTaskExecutorID()); if (registration != null) { - state.onRegistration(registration); + log.debug("Found registration {} for task executor {}", registration, heartbeat.getTaskExecutorID()); + Preconditions.checkState(state.onRegistration(registration)); } else { // TODO(sundaram): add a metric log.warn("Received heartbeat from unknown task executor {}", heartbeat.getTaskExecutorID()); sender().tell(new Status.Failure(new TaskExecutorNotFoundException(taskExecutorID)), self()); return; } + } else { + log.debug("Found registration {} for registered task executor {}", + state.getRegistration(), heartbeat.getTaskExecutorID()); } boolean stateChange = state.onHeartbeat(heartbeat); if (stateChange && state.isAvailable()) { @@ -567,6 +572,7 @@ private void onHeartbeat(TaskExecutorHeartbeat heartbeat) { } updateHeartbeatTimeout(heartbeat.getTaskExecutorID()); + log.debug("Successfully processed heartbeat {} from task executor {}", heartbeat, heartbeat.getTaskExecutorID()); sender().tell(Ack.getInstance(), self()); } catch (Exception e) { sender().tell(new Status.Failure(e), self()); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java index 997680b05..abccb93cf 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java @@ -45,7 +45,6 @@ import io.mantisrx.server.master.resourcecluster.ContainerSkuID; import io.mantisrx.server.master.resourcecluster.PagedActiveJobOverview; import io.mantisrx.server.master.resourcecluster.ResourceCluster; -import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper; import io.mantisrx.server.master.resourcecluster.TaskExecutorAllocationRequest; import io.mantisrx.server.master.resourcecluster.TaskExecutorID; import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration; @@ -62,17 +61,14 @@ class ResourceClusterAkkaImpl extends ResourceClusterGatewayAkkaImpl implements ResourceCluster { private final ClusterID clusterID; - private final ResourceClusterTaskExecutorMapper mapper; public ResourceClusterAkkaImpl( ActorRef resourceClusterManagerActor, Duration askTimeout, ClusterID clusterID, - ResourceClusterTaskExecutorMapper mapper, Supplier rateLimitPerSecond) { - super(resourceClusterManagerActor, askTimeout, mapper, rateLimitPerSecond); + super(resourceClusterManagerActor, askTimeout, rateLimitPerSecond); this.clusterID = clusterID; - this.mapper = mapper; } @Override @@ -87,9 +83,7 @@ public CompletableFuture initializeTaskExecutor(TaskExecutorID taskExecutor new InitializeTaskExecutorRequest(taskExecutorID, workerId), askTimeout) .thenApply(Ack.class::cast) - .toCompletableFuture() - .whenComplete((ack, dontCare) -> - mapper.onTaskExecutorDiscovered(clusterID, taskExecutorID)); + .toCompletableFuture(); } @Override diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.java index 6a43ffbc1..b31d20de4 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.java @@ -25,7 +25,6 @@ import io.mantisrx.common.metrics.MetricsRegistry; import io.mantisrx.server.master.resourcecluster.RequestThrottledException; import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway; -import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper; import io.mantisrx.server.master.resourcecluster.TaskExecutorDisconnection; import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat; import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration; @@ -44,7 +43,6 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway { protected final ActorRef resourceClusterManagerActor; protected final Duration askTimeout; - private final ResourceClusterTaskExecutorMapper mapper; private final Counter registrationCounter; private final Counter heartbeatCounter; private final Counter disconnectionCounter; @@ -58,11 +56,9 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway { ResourceClusterGatewayAkkaImpl( ActorRef resourceClusterManagerActor, Duration askTimeout, - ResourceClusterTaskExecutorMapper mapper, - Supplier maxConcurrentRequestCount) { + Supplier maxConcurrentRequestCount) { this.resourceClusterManagerActor = resourceClusterManagerActor; this.askTimeout = askTimeout; - this.mapper = mapper; log.info("Setting maxConcurrentRequestCount for resourceCluster gateway {}", maxConcurrentRequestCount); this.rateLimiter = RateLimiter.create(maxConcurrentRequestCount.get()); @@ -109,11 +105,7 @@ private CompletableFuture registerTaskExecutorImpl(TaskExecutorRegistration return Patterns .ask(resourceClusterManagerActor, registration, askTimeout) .thenApply(Ack.class::cast) - .toCompletableFuture() - .whenComplete((dontCare, throwable) -> - mapper.onTaskExecutorDiscovered( - registration.getClusterID(), - registration.getTaskExecutorID())); + .toCompletableFuture(); } @Override diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersAkkaImpl.java index 1d6bcebb8..028a7f6a2 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersAkkaImpl.java @@ -25,7 +25,6 @@ import io.mantisrx.server.master.persistence.MantisJobStore; import io.mantisrx.server.master.resourcecluster.ClusterID; import io.mantisrx.server.master.resourcecluster.ResourceCluster; -import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper; import io.mantisrx.server.master.resourcecluster.ResourceClusters; import io.mantisrx.server.master.scheduler.JobMessageRouter; import java.time.Clock; @@ -48,7 +47,6 @@ public class ResourceClustersAkkaImpl implements ResourceClusters { private final ActorRef resourceClustersManagerActor; private final Duration askTimeout; - private final ResourceClusterTaskExecutorMapper mapper; private final Supplier rateLimitPerSecond; private final ConcurrentMap cache = new ConcurrentHashMap<>(); @@ -62,8 +60,7 @@ public ResourceCluster getClusterFor(ClusterID clusterID) { resourceClustersManagerActor, askTimeout, clusterID, - mapper, - rateLimitPerSecond)); + rateLimitPerSecond)); return cache.get(clusterID); } @@ -90,12 +87,10 @@ public static ResourceClusters load( ResourceClustersManagerActor.props(masterConfiguration.getConfig(), Clock.systemDefaultZone(), rpcService, mantisJobStore, resourceClusterHostActorRef, persistenceProvider, jobMessageRouter)); - final ResourceClusterTaskExecutorMapper globalMapper = - ResourceClusterTaskExecutorMapper.inMemory(); final Duration askTimeout = java.time.Duration.ofMillis( ConfigurationProvider.getConfig().getMasterApiAskTimeoutMs()); final Supplier rateLimitPerSecond = () -> masterConfiguration.getConfig().getResourceClusterActionsPermitsPerSecond(); - return new ResourceClustersAkkaImpl(resourceClusterManagerActor, askTimeout, globalMapper, rateLimitPerSecond); + return new ResourceClustersAkkaImpl(resourceClusterManagerActor, askTimeout, rateLimitPerSecond); } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java index 6af22a83f..2b44591b8 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java @@ -48,7 +48,6 @@ import io.mantisrx.server.master.resourcecluster.ResourceCluster.ResourceOverview; import io.mantisrx.server.master.resourcecluster.ResourceCluster.TaskExecutorNotFoundException; import io.mantisrx.server.master.resourcecluster.ResourceCluster.TaskExecutorStatus; -import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper; import io.mantisrx.server.master.resourcecluster.TaskExecutorAllocationRequest; import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat; import io.mantisrx.server.master.resourcecluster.TaskExecutorID; @@ -155,7 +154,6 @@ public class ResourceClusterActorTest { private final TaskExecutorGateway gateway = mock(TaskExecutorGateway.class); private MantisJobStore mantisJobStore; - private ResourceClusterTaskExecutorMapper mapper; private ActorRef resourceClusterActor; private ResourceCluster resourceCluster; private JobMessageRouter jobMessageRouter; @@ -175,7 +173,6 @@ public static void teardown() { public void setupRpcService() { rpcService.registerGateway(TASK_EXECUTOR_ADDRESS, gateway); mantisJobStore = mock(MantisJobStore.class); - mapper = ResourceClusterTaskExecutorMapper.inMemory(); jobMessageRouter = mock(JobMessageRouter.class); } @@ -201,7 +198,6 @@ public void setupActor() { resourceClusterActor, Duration.ofSeconds(1), CLUSTER_ID, - mapper, () -> 10000); }