From e90032d3ea94088c5858550aea44f91523551903 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Tue, 17 Dec 2024 15:24:58 -0800 Subject: [PATCH 1/9] Fix scheduler double booking task executors (#733) * fix TE double booking * format --- .../ExecutorStateManagerImpl.java | 25 +++++++++- .../resourcecluster/ResourceClusterActor.java | 48 ++++++++++++++++--- .../ResourceClustersManagerActor.java | 1 + .../resourcecluster/TaskExecutorState.java | 15 ++++++ .../master/config/MasterConfiguration.java | 4 ++ .../ExecutorStateManagerTests.java | 19 +++++++- ...ourceClusterActorClusterUsageAkkaTest.java | 2 + .../ResourceClusterActorTest.java | 2 + 8 files changed, 107 insertions(+), 9 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java index 02489a5d0..59bec4297 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java @@ -39,6 +39,8 @@ import io.mantisrx.shaded.com.google.common.cache.Cache; import io.mantisrx.shaded.com.google.common.cache.CacheBuilder; import io.mantisrx.shaded.com.google.common.cache.RemovalListener; + +import java.time.Duration; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Comparator; @@ -111,6 +113,8 @@ private TaskExecutorGroupKey findBestFitGroupOrDefault(SchedulingConstraints con private final Map schedulingAttributes; + private final Duration schedulerLeaseExpirationDuration; + private final Cache archivedState = CacheBuilder.newBuilder() .maximumSize(10000) .expireAfterWrite(24, TimeUnit.HOURS) @@ -121,11 +125,16 @@ private TaskExecutorGroupKey findBestFitGroupOrDefault(SchedulingConstraints con ExecutorStateManagerImpl(Map schedulingAttributes) { this.schedulingAttributes = schedulingAttributes; this.fitnessCalculator = new CpuWeightedFitnessCalculator(); + this.schedulerLeaseExpirationDuration = Duration.ofMillis(100); } - ExecutorStateManagerImpl(Map schedulingAttributes, FitnessCalculator fitnessCalculator) { + ExecutorStateManagerImpl( + Map schedulingAttributes, + FitnessCalculator fitnessCalculator, + Duration schedulerLeaseExpirationDuration) { this.schedulingAttributes = schedulingAttributes; this.fitnessCalculator = fitnessCalculator; + this.schedulerLeaseExpirationDuration = schedulerLeaseExpirationDuration; } @Override @@ -357,10 +366,22 @@ private Optional> findBestFitFor(TaskExec } TaskExecutorState st = this.taskExecutorStateMap.get(teHolder.getId()); return st.isAvailable() && + // when a TE is returned from here to be used for scheduling, its state remain active until + // the scheduler trigger another message to update (lock) the state. However when large number + // of the requests are active at the same time on same sku, the gap between here and the message + // to lock the state can be large so another schedule request message can be in between and + // got the same set of TEs. To avoid this, a lease is added to each TE state to temporarily + // lock the TE to be used again. Since this is only lock between actor messages and lease + // duration can be short. + st.getLastSchedulerLeasedDuration().compareTo(this.schedulerLeaseExpirationDuration) > 0 && st.getRegistration() != null; }) .limit(numWorkers) - .map(TaskExecutorHolder::getId) + .map(teHolder -> { + TaskExecutorState st = this.taskExecutorStateMap.get(teHolder.getId()); + st.updateLastSchedulerLeased(); + return teHolder.getId(); + }) .collect(Collectors.toMap( taskExecutorID -> taskExecutorID, this.taskExecutorStateMap::get))); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java index 20f3e3995..2dbd2ad48 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java @@ -120,6 +120,7 @@ public SupervisorStrategy supervisorStrategy() { private final Duration heartbeatTimeout; private final Duration assignmentTimeout; private final Duration disabledTaskExecutorsCheckInterval; + private final Duration schedulerLeaseExpirationDuration; private final ExecutorStateManager executorStateManager; private final Clock clock; @@ -139,9 +140,39 @@ public SupervisorStrategy supervisorStrategy() { private final boolean isJobArtifactCachingEnabled; - static Props props(final ClusterID clusterID, final Duration heartbeatTimeout, Duration assignmentTimeout, Duration disabledTaskExecutorsCheckInterval, Clock clock, RpcService rpcService, MantisJobStore mantisJobStore, JobMessageRouter jobMessageRouter, int maxJobArtifactsToCache, String jobClustersWithArtifactCachingEnabled, boolean isJobArtifactCachingEnabled, Map schedulingAttributes, FitnessCalculator fitnessCalculator) { - return Props.create(ResourceClusterActor.class, clusterID, heartbeatTimeout, assignmentTimeout, disabledTaskExecutorsCheckInterval, clock, rpcService, mantisJobStore, jobMessageRouter, maxJobArtifactsToCache, jobClustersWithArtifactCachingEnabled, isJobArtifactCachingEnabled, schedulingAttributes, fitnessCalculator) - .withMailbox("akka.actor.metered-mailbox"); + static Props props( + final ClusterID clusterID, + final Duration heartbeatTimeout, + Duration assignmentTimeout, + Duration disabledTaskExecutorsCheckInterval, + Duration schedulerLeaseExpirationDuration, + Clock clock, + RpcService rpcService, + MantisJobStore mantisJobStore, + JobMessageRouter jobMessageRouter, + int maxJobArtifactsToCache, + String jobClustersWithArtifactCachingEnabled, + boolean isJobArtifactCachingEnabled, + Map schedulingAttributes, + FitnessCalculator fitnessCalculator + ) { + return Props.create( + ResourceClusterActor.class, + clusterID, + heartbeatTimeout, + assignmentTimeout, + disabledTaskExecutorsCheckInterval, + schedulerLeaseExpirationDuration, + clock, + rpcService, + mantisJobStore, + jobMessageRouter, + maxJobArtifactsToCache, + jobClustersWithArtifactCachingEnabled, + isJobArtifactCachingEnabled, + schedulingAttributes, + fitnessCalculator + ).withMailbox("akka.actor.metered-mailbox"); } ResourceClusterActor( @@ -149,6 +180,7 @@ static Props props(final ClusterID clusterID, final Duration heartbeatTimeout, D Duration heartbeatTimeout, Duration assignmentTimeout, Duration disabledTaskExecutorsCheckInterval, + Duration schedulerLeaseExpirationDuration, Clock clock, RpcService rpcService, MantisJobStore mantisJobStore, @@ -162,6 +194,7 @@ static Props props(final ClusterID clusterID, final Duration heartbeatTimeout, D this.heartbeatTimeout = heartbeatTimeout; this.assignmentTimeout = assignmentTimeout; this.disabledTaskExecutorsCheckInterval = disabledTaskExecutorsCheckInterval; + this.schedulerLeaseExpirationDuration = schedulerLeaseExpirationDuration; this.isJobArtifactCachingEnabled = isJobArtifactCachingEnabled; this.clock = clock; @@ -173,7 +206,8 @@ static Props props(final ClusterID clusterID, final Duration heartbeatTimeout, D this.maxJobArtifactsToCache = maxJobArtifactsToCache; this.jobClustersWithArtifactCachingEnabled = jobClustersWithArtifactCachingEnabled; - this.executorStateManager = new ExecutorStateManagerImpl(schedulingAttributes, fitnessCalculator); + this.executorStateManager = new ExecutorStateManagerImpl( + schedulingAttributes, fitnessCalculator, this.schedulerLeaseExpirationDuration); this.metrics = new ResourceClusterActorMetrics(); } @@ -523,10 +557,12 @@ private void onDisableTaskExecutorsRequestExpiry(ExpireDisableTaskExecutorsReque if (request.getRequest().getTaskExecutorID().isPresent()) { final TaskExecutorState state = this.executorStateManager.get( request.getRequest().getTaskExecutorID().get()); - state.onNodeEnabled(); + if (state != null) { + state.onNodeEnabled(); + } } } catch (Exception e) { - log.error("Failed to delete expired {}", request.getRequest()); + log.error("Failed to delete expired {}", request.getRequest(), e); } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java index ae2bcb96b..7a8d7b82a 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java @@ -174,6 +174,7 @@ private ActorRef createResourceClusterActorFor(ClusterID clusterID) { Duration.ofMillis(masterConfiguration.getHeartbeatIntervalInMs()), Duration.ofMillis(masterConfiguration.getAssignmentIntervalInMs()), Duration.ofMillis(masterConfiguration.getAssignmentIntervalInMs()), + Duration.ofMillis(masterConfiguration.getSchedulerLeaseExpirationDurationInMs()), clock, rpcService, mantisJobStore, diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java index a1da3263d..767e71842 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java @@ -32,6 +32,7 @@ import io.mantisrx.server.master.scheduler.WorkerOnDisabledVM; import io.mantisrx.server.worker.TaskExecutorGateway; import java.time.Clock; +import java.time.Duration; import java.time.Instant; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -62,6 +63,9 @@ enum RegistrationState { // last interaction initiated by the task executor private Instant lastActivity; + + // last interaction time when this instance was leased by the scheduler in findBestFit. + private Instant lastSchedulerLeased; private final Clock clock; private final RpcService rpcService; private final JobMessageRouter jobMessageRouter; @@ -78,6 +82,7 @@ static TaskExecutorState of(Clock clock, RpcService rpcService, JobMessageRouter null, false, clock.instant(), + Instant.MIN, clock, rpcService, jobMessageRouter, @@ -278,6 +283,16 @@ Instant getLastActivity() { return this.lastActivity; } + Duration getLastSchedulerLeasedDuration() + { + return Duration.between(this.lastSchedulerLeased, this.clock.instant()); + } + + void updateLastSchedulerLeased() + { + this.lastSchedulerLeased = this.clock.instant(); + } + TaskExecutorRegistration getRegistration() { return this.registration; } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java index 289ca587f..9c7c06422 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java @@ -329,6 +329,10 @@ default Duration getSchedulerIntervalBetweenRetries() { @Default("60000") // 1 minute int getAssignmentIntervalInMs(); + @Config("mantis.agent.assignment.scheduler.lease.ms") + @Default("100") + int getSchedulerLeaseExpirationDurationInMs(); + @Config("mantis.job.costsCalculator.class") @Default("io.mantisrx.master.jobcluster.job.NoopCostsCalculator") CostsCalculator getJobCostsCalculator(); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerTests.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerTests.java index d80e93286..3475ef80a 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerTests.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerTests.java @@ -291,6 +291,7 @@ public void testGetBestFit_WithGenerationFromScaleGroup() { ATTRIBUTES_WITH_SCALE_GROUP_1, stateManager); + this.actual.set(Clock.fixed(Instant.ofEpochSecond(2), ZoneId.systemDefault())); bestFitO = stateManager.findBestFit( new TaskExecutorBatchAssignmentRequest( @@ -364,12 +365,14 @@ private TaskExecutorState registerNewTaskExecutor(TaskExecutorID id, MachineDefi } @Test - public void testGetBestFit_WithDifferentResourcesSameSku() { + public void testGetBestFit_WithDifferentResourcesSameSku() throws InterruptedException { registerNewTaskExecutor(TASK_EXECUTOR_ID_1, MACHINE_DEFINITION_2, ATTRIBUTES_WITH_SCALE_GROUP_2, stateManager); + this.actual.set(Clock.fixed(Instant.ofEpochSecond(2), ZoneId.systemDefault())); + // should get te1 with group2 Optional bestFitO = stateManager.findBestFit( @@ -386,6 +389,19 @@ public void testGetBestFit_WithDifferentResourcesSameSku() { ATTRIBUTES_WITH_SCALE_GROUP_1, stateManager); + // do not move the clock, should still get nothing as scheduler lease is still valid + bestFitO = + stateManager.findBestFit( + new TaskExecutorBatchAssignmentRequest( + new HashSet<>(Arrays.asList( + TaskExecutorAllocationRequest.of(WORKER_ID, SchedulingConstraints.of(MACHINE_DEFINITION_2), null, 0), + TaskExecutorAllocationRequest.of(WORKER_ID, SchedulingConstraints.of(MACHINE_DEFINITION_1), null, 1))), + CLUSTER_ID)); + + assertFalse(bestFitO.isPresent()); + + this.actual.set(Clock.fixed(Instant.ofEpochSecond(3), ZoneId.systemDefault())); + bestFitO = stateManager.findBestFit( new TaskExecutorBatchAssignmentRequest( @@ -605,6 +621,7 @@ public void testGetBestFit_WithSameCoresDifferentMemory() { stateManager.tryMarkAvailable(TaskExecutorID.of("te3")); // matching found for 2cores, 14GB since the fit TE shape is now 2cores, 14GB and there are 2 TE available + this.actual.set(Clock.fixed(Instant.ofEpochSecond(2), ZoneId.systemDefault())); bestFit = stateManager.findBestFit(new TaskExecutorBatchAssignmentRequest( new HashSet<>(Arrays.asList( diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorClusterUsageAkkaTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorClusterUsageAkkaTest.java index 5c47eb9ce..478ce6de9 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorClusterUsageAkkaTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorClusterUsageAkkaTest.java @@ -72,6 +72,7 @@ public class ResourceClusterActorClusterUsageAkkaTest { private static final ClusterID CLUSTER_ID = ClusterID.of("clusterId"); private static final Duration heartbeatTimeout = Duration.ofSeconds(10); private static final Duration checkForDisabledExecutorsInterval = Duration.ofSeconds(10); + private static final Duration schedulerLeaseExpirationDuration = Duration.ofMillis(100); private static final Duration assignmentTimeout = Duration.ofSeconds(1); private static final String HOST_NAME = "hostname"; private static final WorkerPorts WORKER_PORTS = new WorkerPorts(1, 2, 3, 4, 5); @@ -166,6 +167,7 @@ public void setupActor() throws Exception { heartbeatTimeout, assignmentTimeout, checkForDisabledExecutorsInterval, + schedulerLeaseExpirationDuration, Clock.systemDefaultZone(), rpcService, mantisJobStore, diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java index d1642bdb1..8fd915b03 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java @@ -102,6 +102,7 @@ public class ResourceClusterActorTest { private static final ClusterID CLUSTER_ID = ClusterID.of("clusterId"); private static final Duration heartbeatTimeout = Duration.ofSeconds(10); private static final Duration checkForDisabledExecutorsInterval = Duration.ofSeconds(10); + private static final Duration schedulerLeaseExpirationDuration = Duration.ofMillis(100); private static final Duration assignmentTimeout = Duration.ofSeconds(1); private static final String HOST_NAME = "hostname"; @@ -201,6 +202,7 @@ public void setupActor() { heartbeatTimeout, assignmentTimeout, checkForDisabledExecutorsInterval, + schedulerLeaseExpirationDuration, Clock.systemDefaultZone(), rpcService, mantisJobStore, From 35a0826af9af829bf3b933b277b8b813d125ef8a Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Wed, 18 Dec 2024 12:46:59 -0800 Subject: [PATCH 2/9] Fix worker no heartbeat race condition (#734) * fix worker no heartbeat race condition * log refactor --- .../master/jobcluster/job/JobActor.java | 24 ++-- .../jobcluster/job/JobTestLifecycle.java | 110 ++++++++++++++++-- 2 files changed, 117 insertions(+), 17 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java index 6c6f457db..b879b8be5 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java @@ -1943,15 +1943,25 @@ public void checkHeartBeats(Instant currentTime) { acceptedAt); } } else { - if (Duration.between(workerMeta.getLastHeartbeatAt().get(), currentTime).getSeconds() + // no heartbeat or heartbeat too old + if (!workerMeta.getLastHeartbeatAt().isPresent() || Duration.between(workerMeta.getLastHeartbeatAt().get(), currentTime).getSeconds() > missedHeartBeatToleranceSecs) { - // heartbeat too old this.numWorkerMissingHeartbeat.increment(); - LOGGER.info("Job {}, Worker {} Duration between last heartbeat and now {} " - + "missed heart beat threshold {} exceeded", this.jobMgr.getJobId(), - workerMeta.getWorkerId(), Duration.between( - workerMeta.getLastHeartbeatAt().get(), - currentTime).getSeconds(), missedHeartBeatToleranceSecs); + + if (!workerMeta.getLastHeartbeatAt().isPresent()) { + LOGGER.warn("Job {}, Worker {} hasn't received heartbeat, threshold {} exceeded", + this.jobMgr.getJobId(), + workerMeta.getWorkerId(), + missedHeartBeatToleranceSecs); + } else { + LOGGER.warn("Job {}, Worker {} Duration between last heartbeat and now {} " + + "missed heart beat threshold {} exceeded", + this.jobMgr.getJobId(), + workerMeta.getWorkerId(), + Duration.between( + workerMeta.getLastHeartbeatAt().get(), + currentTime).getSeconds(), missedHeartBeatToleranceSecs); + } if (ConfigurationProvider.getConfig().isHeartbeatTerminationEnabled()) { eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(WARN, diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java index 0fbbdc35c..31ec5c45d 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java @@ -23,10 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -118,6 +115,7 @@ public void testJobSubmitWithoutInit() { try { jobDefn = JobTestHelper.generateJobDefinition(clusterName); MantisScheduler schedulerMock = mock(MantisScheduler.class); + when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true); MantisJobStore jobStoreMock = mock(MantisJobStore.class); MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder() .withJobId(new JobId(clusterName,1)) @@ -151,6 +149,7 @@ public void testJobSubmit() { // IMantisStorageProvider storageProvider = new SimpleCachedFileStorageProvider(); // MantisJobStore jobStore = new MantisJobStore(storageProvider); MantisScheduler schedulerMock = mock(MantisScheduler.class); + when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true); MantisJobStore jobStoreMock = mock(MantisJobStore.class); MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder() .withJobId(new JobId(clusterName,1)) @@ -246,6 +245,7 @@ public void testJobSubmitPerpetual() { .withJobSla(new JobSla(0, 0, null, MantisJobDurationType.Perpetual, null)) .build(); MantisScheduler schedulerMock = mock(MantisScheduler.class); + when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true); MantisJobStore jobStoreMock = mock(MantisJobStore.class); MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder() .withJobId(new JobId(clusterName,1)) @@ -338,6 +338,7 @@ public void testJobSubmitInitalizationFails() { try { jobDefn = JobTestHelper.generateJobDefinition(clusterName); MantisScheduler schedulerMock = mock(MantisScheduler.class); + when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true); MantisJobStore jobStoreMock = mock(MantisJobStore.class); Mockito.doThrow(IOException.class).when(jobStoreMock).storeNewJob(any()); MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder() @@ -385,6 +386,7 @@ public void testJobSubmitWithMultipleWorkers() { jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo); MantisScheduler schedulerMock = mock(MantisScheduler.class); + when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true); MantisJobStore jobStoreMock = mock(MantisJobStore.class); MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder() .withJobId(new JobId(clusterName,2)) @@ -497,6 +499,7 @@ public void testJobSubmitWithMultipleStagesAndWorkers() { jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo); MantisScheduler schedulerMock = mock(MantisScheduler.class); + when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true); MantisJobStore jobStoreMock = mock(MantisJobStore.class); MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder() .withJobId(new JobId(clusterName,1)) @@ -605,6 +608,7 @@ public void testListActiveWorkers() { jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo); MantisScheduler schedulerMock = mock(MantisScheduler.class); + when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true); MantisJobStore jobStoreMock = mock(MantisJobStore.class); MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder() .withJobId(new JobId(clusterName,2)) @@ -709,6 +713,7 @@ public void testkill() throws Exception { JobDefinition jobDefn = JobTestHelper.generateJobDefinition(clusterName); MantisScheduler schedulerMock = mock(MantisScheduler.class); + when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true); MantisJobStore jobStoreMock = mock(MantisJobStore.class); MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder() .withJobId(new JobId(clusterName,3)) @@ -741,7 +746,7 @@ public void testkill() throws Exception { } @Test - public void testHeartBeatMissingResubmit() { + public void testNoHeartBeatAfterLaunchResubmit() { final TestKit probe = new TestKit(system); String clusterName= "testHeartBeatMissingResubmit"; IJobClusterDefinition jobClusterDefn = JobTestHelper.generateJobClusterDefinition(clusterName); @@ -753,6 +758,8 @@ public void testHeartBeatMissingResubmit() { jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo); MantisScheduler schedulerMock = mock(MantisScheduler.class); + when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true); + MantisJobStore jobStoreMock = mock(MantisJobStore.class); MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder() .withJobId(new JobId(clusterName,2)) @@ -792,13 +799,95 @@ public void testHeartBeatMissingResubmit() { // 2 worker have started so job should be started. assertEquals(JobState.Accepted, resp3.getJobMetadata().get().getState()); - JobTestHelper.sendHeartBeat(probe, jobActor, jobId,1, workerId2); + Instant now = Instant.now(); + jobActor.tell(new JobProto.CheckHeartBeat(now.plusSeconds(240)), probe.getRef()); + Thread.sleep(1000); + + // 1 original submissions and 0 resubmits because of worker not in launched state with HB timeouts + verify(schedulerMock, times(1)).scheduleWorkers(any()); + // 1 kills due to resubmits + verify(schedulerMock, times(0)).unscheduleAndTerminateWorker(any(), any()); - JobTestHelper.sendHeartBeat(probe, jobActor, jobId,1, workerId); + // launch worker but no HB yet + JobTestHelper.sendWorkerLaunchedEvent(probe, jobActor, workerId2, stageNo); // check hb status in the future where we expect all last HBs to be stale. - Instant now = Instant.now(); + now = Instant.now(); jobActor.tell(new JobProto.CheckHeartBeat(now.plusSeconds(240)), probe.getRef()); + Thread.sleep(1000); + + // job status remain as accepted + jobActor.tell(new JobClusterManagerProto.GetJobDetailsRequest("nj", jobId), probe.getRef()); + GetJobDetailsResponse resp4 = probe.expectMsgClass(GetJobDetailsResponse.class); + assertEquals(SUCCESS, resp4.responseCode); + assertEquals(JobState.Accepted, resp4.getJobMetadata().get().getState()); + + // 1 original submissions and 0 resubmits because of worker not in launched state with HB timeouts + verify(schedulerMock, times(2)).scheduleWorkers(any()); + // 1 kills due to resubmits + verify(schedulerMock, times(1)).unscheduleAndTerminateWorker(eq(workerId2), any()); + } catch (Exception e) { + fail("unexpected exception " + e.getMessage()); + } + } + + @Test + public void testHeartBeatPendingSchedulingNoResubmit() { + final TestKit probe = new TestKit(system); + String clusterName= "testHeartBeatMissingResubmit"; + IJobClusterDefinition jobClusterDefn = JobTestHelper.generateJobClusterDefinition(clusterName); + + JobDefinition jobDefn; + try { + SchedulingInfo sInfo = new SchedulingInfo.Builder().numberOfStages(1).multiWorkerStageWithConstraints(2, new MachineDefinition(1.0,1.0,1.0,3), Lists.newArrayList(), Lists.newArrayList()).build(); + + jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo); + + MantisScheduler schedulerMock = mock(MantisScheduler.class); + when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true); + MantisJobStore jobStoreMock = mock(MantisJobStore.class); + MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder() + .withJobId(new JobId(clusterName,2)) + .withSubmittedAt(Instant.now()) + .withJobState(JobState.Accepted) + + .withNextWorkerNumToUse(1) + .withJobDefinition(jobDefn) + .build(); + final ActorRef jobActor = system.actorOf(JobActor.props(jobClusterDefn, mantisJobMetaData, jobStoreMock, schedulerMock, eventPublisher, costsCalculator)); + + jobActor.tell(new JobProto.InitJob(probe.getRef()), probe.getRef()); + JobProto.JobInitialized initMsg = probe.expectMsgClass(JobProto.JobInitialized.class); + assertEquals(SUCCESS, initMsg.responseCode); + String jobId = clusterName + "-2"; + jobActor.tell(new JobClusterManagerProto.GetJobDetailsRequest("nj", jobId), probe.getRef()); + GetJobDetailsResponse resp = probe.expectMsgClass(GetJobDetailsResponse.class); + assertEquals(SUCCESS, resp.responseCode); + assertEquals(JobState.Accepted,resp.getJobMetadata().get().getState()); + int stageNo = 1; + + WorkerId workerId = new WorkerId(jobId, 0, 1); + // check job status again + jobActor.tell(new JobClusterManagerProto.GetJobDetailsRequest("nj", jobId), probe.getRef()); + GetJobDetailsResponse resp2 = probe.expectMsgClass(GetJobDetailsResponse.class); + assertEquals(SUCCESS, resp2.responseCode); + + // No worker has started. + assertEquals(JobState.Accepted,resp2.getJobMetadata().get().getState()); + WorkerId workerId2 = new WorkerId(jobId, 1, 2); + + // check job status again + jobActor.tell(new JobClusterManagerProto.GetJobDetailsRequest("nj", jobId), probe.getRef()); + GetJobDetailsResponse resp3 = probe.expectMsgClass(GetJobDetailsResponse.class); + assertEquals(SUCCESS, resp3.responseCode); + + // 2 worker have started so job should be started. + assertEquals(JobState.Accepted, resp3.getJobMetadata().get().getState()); + + // trigger HB check far into the future where no retry on scheduling is expected because the worker has not + // switched into launched state yet. + Instant now = Instant.now(); + jobActor.tell(new JobProto.CheckHeartBeat(now.plusSeconds(99999)), probe.getRef()); Thread.sleep(1000); @@ -807,8 +896,7 @@ public void testHeartBeatMissingResubmit() { // 0 kills due to resubmits verify(schedulerMock, times(0)).unscheduleAndTerminateWorker(any(), any()); } catch (Exception e) { - e.printStackTrace(); - fail(); + fail("unexpected exception " + e.getMessage()); } } @@ -825,6 +913,7 @@ public void testHeartBeatEnforcement() { jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo); MantisScheduler schedulerMock = mock(MantisScheduler.class); + when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true); MantisJobStore jobStoreMock = mock(MantisJobStore.class); MantisJobMetadataImpl mantisJobMetaData = new MantisJobMetadataImpl.Builder() .withJobId(new JobId(clusterName,2)) @@ -920,6 +1009,7 @@ public void testLostWorkerGetsReplaced() { jobDefn = JobTestHelper.generateJobDefinition(clusterName, sInfo); MantisScheduler schedulerMock = mock(MantisScheduler.class); + when(schedulerMock.schedulerHandlesAllocationRetries()).thenReturn(true); //MantisJobStore jobStoreMock = mock(MantisJobStore.class); MantisJobStore jobStoreSpied = Mockito.spy(jobStore); From 419c15f4f2eb79e6213cf4183bb97e27e4c7fe5d Mon Sep 17 00:00:00 2001 From: Daniel Trager <43889268+dtrager02@users.noreply.github.com> Date: Fri, 20 Dec 2024 17:38:07 -0800 Subject: [PATCH 3/9] Update CODEOWNERS (#736) Add myself to owners --- .github/CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 595b2b32d..47da1f9a0 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @calvin681 @sundargates @Andyz26 @hmitnflx @fdc-ntflx +* @calvin681 @sundargates @Andyz26 @hmitnflx @fdc-ntflx @dtrager02 From a18a200a3a27cc0fe35bd1bd8a17e9150aecd436 Mon Sep 17 00:00:00 2001 From: timmartin-stripe <131782471+timmartin-stripe@users.noreply.github.com> Date: Mon, 23 Dec 2024 13:53:15 -0500 Subject: [PATCH 4/9] Use a singleton for DynamoDbMasterMonitor (#737) We encountered an issue where the MasterMonitor's `shutdown` method was called somewhere. This led to clients relying on updates to the MasterMonitor stopping receiving updates but they had no mechanism of knowing that the monitor they were subscribed to was no longer updating because the observable was not completed and getting the latest version would continue to return the last known value (even though that value was not updated). We were deciding between either using a Singleton or tracking down `shutdown` calls to `MasterMonitor` and ensuring that usages could resubscribe if necessary. The latter seemed like it would be prone to breakage in the future (e.g. particularly with the master observable). Additionally, a Singleton pattern seemed fitting given that almost every reasonable use case of Mantis relies on a sustained connection to the Mantis Master. So there's no reason to expect that shutting down the executor pool for updating the master is necessary. Additionally, using a Singleton avoids any unbounded resource leakage (e.g. it is still safe to create N MasterMonitors). --- .../dynamodb/DynamoDBMasterMonitor.java | 160 +------------- .../DynamoDBMasterMonitorSingleton.java | 202 ++++++++++++++++++ ...> DynamoDBMasterMonitorSingletonTest.java} | 29 +-- 3 files changed, 225 insertions(+), 166 deletions(-) create mode 100644 mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorSingleton.java rename mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/{DynamoDBMasterMonitorTest.java => DynamoDBMasterMonitorSingletonTest.java} (82%) diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java b/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java index a1be73355..63e03371b 100644 --- a/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java +++ b/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.java @@ -15,180 +15,33 @@ */ package io.mantisrx.extensions.dynamodb; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient; -import com.amazonaws.services.dynamodbv2.LockItem; -import io.mantisrx.common.metrics.Counter; -import io.mantisrx.common.metrics.Metrics; -import io.mantisrx.common.metrics.MetricsRegistry; import io.mantisrx.server.core.BaseService; -import io.mantisrx.server.core.json.DefaultObjectMapper; import io.mantisrx.server.core.master.MasterDescription; import io.mantisrx.server.core.master.MasterMonitor; -import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import rx.Observable; -import rx.subjects.BehaviorSubject; @Slf4j public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor { - - private static final Logger logger = LoggerFactory.getLogger(DynamoDBMasterMonitor.class); - - - private final ThreadFactory monitorThreadFactory = r -> { - Thread thread = new Thread(r); - thread.setName("dynamodb-monitor-" + System.currentTimeMillis()); - thread.setDaemon(true); // allow JVM to shutdown if monitor is still running - thread.setPriority(Thread.NORM_PRIORITY); - thread.setUncaughtExceptionHandler((t, e) -> logger.error("thread: {} failed with {}", t.getName(), e.getMessage(), e) ); - return thread; - }; - private final ScheduledExecutorService leaderMonitor = - Executors.newScheduledThreadPool(1, monitorThreadFactory); - - // Assuming your lock client's options are in a variable named options - private final AmazonDynamoDBLockClient lockClient; - - private final String partitionKey; - - private final Duration pollInterval; - - private final Duration gracefulShutdown; - - private final BehaviorSubject masterSubject; - - private final ObjectMapper jsonMapper = DefaultObjectMapper.getInstance(); - - private final Metrics metrics; - - private final Counter noLockPresentCounter; - private final Counter lockDecodeFailedCounter; - private final Counter nullNextLeaderCounter; + private final DynamoDBMasterMonitorSingleton singleton; /** * Creates a MasterMonitor backed by DynamoDB. This should be used if you are using a {@link DynamoDBLeaderElector} */ public DynamoDBMasterMonitor() { - this(DynamoDBClientSingleton.getLockClient(), - DynamoDBClientSingleton.getPartitionKey(), - Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBLeaderHeartbeatDuration()), - Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBMonitorGracefulShutdownDuration())); - } - - public DynamoDBMasterMonitor( - AmazonDynamoDBLockClient lockClient, - String partitionKey, - Duration pollInterval, - Duration gracefulShutdown) { - masterSubject = BehaviorSubject.create(MasterDescription.MASTER_NULL); - this.lockClient = lockClient; - this.partitionKey = partitionKey; - this.pollInterval = pollInterval; - this.gracefulShutdown = gracefulShutdown; - - Metrics m = new Metrics.Builder() - .id("DynamoDBMasterMonitor") - .addCounter("no_lock_present") - .addCounter("lock_decode_failed") - .addCounter("null_next_leader") - .build(); - this.metrics = MetricsRegistry.getInstance().registerAndGet(m); - - this.noLockPresentCounter = metrics.getCounter("no_lock_present"); - this.lockDecodeFailedCounter = metrics.getCounter("lock_decode_failed"); - this.nullNextLeaderCounter = metrics.getCounter("null_next_leader"); + this.singleton = DynamoDBMasterMonitorSingleton.getInstance(); } @Override - @SuppressWarnings("FutureReturnValueIgnored") - public void start() { - leaderMonitor.scheduleAtFixedRate( - this::getCurrentLeader, 0, pollInterval.toMillis(), TimeUnit.MILLISECONDS); - } + public void start() {} @Override - public void shutdown() { - logger.info("close the lock client"); - try { - lockClient.close(); - } catch (IOException e) { - logger.error("error closing the dynamodb lock client", e); - } - - try { - final boolean isTerminated = - leaderMonitor.awaitTermination(gracefulShutdown.toMillis(), TimeUnit.MILLISECONDS); - if (!isTerminated) { - leaderMonitor.shutdownNow(); - } - } catch (InterruptedException e) { - logger.error("error timeout waiting on leader monitor to terminate executor", e); - } - logger.info("leader monitor shutdown"); - } - - @SuppressWarnings("FutureReturnValueIgnored") - private void getCurrentLeader() { - logger.info("attempting leader lookup"); - final Optional optionalLock = lockClient.getLock(partitionKey, Optional.empty()); - final MasterDescription nextDescription; - if (optionalLock.isPresent()) { - final LockItem lock = optionalLock.get(); - nextDescription = lock.getData().map(this::bytesToMaster).orElse(null); - } else { - nextDescription = null; - logger.warn("no leader found"); - this.noLockPresentCounter.increment(); - } - - if (nextDescription != null) { - updateLeader(nextDescription); - } else { - this.nullNextLeaderCounter.increment(); - } - } - - private void updateLeader(@Nullable MasterDescription nextDescription) { - final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL); - final MasterDescription next = (nextDescription == null) ? MasterDescription.MASTER_NULL : nextDescription; - if (!prev.equals(next)) { - logger.info("leader changer information previous {} and next {}", prev.getHostname(), next.getHostname()); - masterSubject.onNext(next); - } - } - - private MasterDescription bytesToMaster(ByteBuffer data) { - // It is possible that the underlying buffer is read more than once, - // so if the offset of the buffer is at the end, rewind, so we can read it. - if (!data.hasRemaining()) { - data.rewind(); - } - final byte[] bytes = new byte[data.remaining()]; - data.get(bytes); - try { - return jsonMapper.readValue(bytes, MasterDescription.class); - } catch (IOException e) { - logger.error("unable to parse master description bytes: {}", data, e); - this.lockDecodeFailedCounter.increment(); - } - return MasterDescription.MASTER_NULL; - } + public void shutdown() {} @Override public Observable getMasterObservable() { - return masterSubject; + return singleton.getMasterSubject(); } /** @@ -198,8 +51,7 @@ public Observable getMasterObservable() { * @return Latest description of the master */ @Override - @Nullable public MasterDescription getLatestMaster() { - return Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL); + return singleton.getMasterSubject().getValue(); } } diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorSingleton.java b/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorSingleton.java new file mode 100644 index 000000000..8f9afd34e --- /dev/null +++ b/mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorSingleton.java @@ -0,0 +1,202 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.extensions.dynamodb; + + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient; +import com.amazonaws.services.dynamodbv2.LockItem; +import io.mantisrx.common.metrics.Counter; +import io.mantisrx.common.metrics.Metrics; +import io.mantisrx.common.metrics.MetricsRegistry; +import io.mantisrx.server.core.json.DefaultObjectMapper; +import io.mantisrx.server.core.master.MasterDescription; +import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.subjects.BehaviorSubject; + +class DynamoDBMasterMonitorSingleton { + private static final Logger logger = LoggerFactory.getLogger(DynamoDBMasterMonitor.class); + + + private final ThreadFactory monitorThreadFactory = r -> { + Thread thread = new Thread(r); + thread.setName("dynamodb-monitor-" + System.currentTimeMillis()); + thread.setDaemon(true); // allow JVM to shut down if monitor is still running + thread.setPriority(Thread.NORM_PRIORITY); + thread.setUncaughtExceptionHandler((t, e) -> logger.error("thread: {} failed with {}", t.getName(), e.getMessage(), e) ); + return thread; + }; + private final ScheduledExecutorService leaderMonitor = + Executors.newScheduledThreadPool(1, monitorThreadFactory); + + // Assuming your lock client's options are in a variable named options + private final AmazonDynamoDBLockClient lockClient; + + private final String partitionKey; + + private final Duration gracefulShutdown; + + private final BehaviorSubject masterSubject; + + private final ObjectMapper jsonMapper = DefaultObjectMapper.getInstance(); + + private final Duration pollInterval; + + private final Counter noLockPresentCounter; + private final Counter lockDecodeFailedCounter; + private final Counter nullNextLeaderCounter; + private final Counter leaderChangedCounter; + private final Counter refreshedLeaderCounter; + + private static volatile DynamoDBMasterMonitorSingleton instance = null; + + public static synchronized DynamoDBMasterMonitorSingleton getInstance() { + if (instance == null) { + instance = new DynamoDBMasterMonitorSingleton(); + Runtime.getRuntime() + .addShutdownHook(new Thread(instance::shutdown, "dynamodb-monitor-shutdown-" + instance.hashCode())); + instance.start(); + } + return instance; + } + + /** + * Creates a MasterMonitor backed by DynamoDB. This should be used if you are using a {@link DynamoDBLeaderElector} + */ + DynamoDBMasterMonitorSingleton() { + this(DynamoDBClientSingleton.getLockClient(), + DynamoDBClientSingleton.getPartitionKey(), + Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBLeaderHeartbeatDuration()), + Duration.parse(DynamoDBClientSingleton.getDynamoDBConf().getDynamoDBMonitorGracefulShutdownDuration())); + } + + DynamoDBMasterMonitorSingleton( + AmazonDynamoDBLockClient lockClient, + String partitionKey, + Duration pollInterval, + Duration gracefulShutdown) { + masterSubject = BehaviorSubject.create(MasterDescription.MASTER_NULL); + this.lockClient = lockClient; + this.partitionKey = partitionKey; + this.pollInterval = pollInterval; + this.gracefulShutdown = gracefulShutdown; + + Metrics m = new Metrics.Builder() + .id("DynamoDBMasterMonitor") + .addCounter("no_lock_present") + .addCounter("lock_decode_failed") + .addCounter("null_next_leader") + .addCounter("refreshed_leader") + .addCounter("leader_changed") + .build(); + Metrics metrics = MetricsRegistry.getInstance().registerAndGet(m); + + this.noLockPresentCounter = metrics.getCounter("no_lock_present"); + this.lockDecodeFailedCounter = metrics.getCounter("lock_decode_failed"); + this.nullNextLeaderCounter = metrics.getCounter("null_next_leader"); + this.refreshedLeaderCounter = metrics.getCounter("refreshed_leader"); + this.leaderChangedCounter = metrics.getCounter("leader_changed"); + } + + public void start() { + logger.info("starting leader monitor"); + leaderMonitor.scheduleAtFixedRate( + this::getCurrentLeader, 0, pollInterval.toMillis(), TimeUnit.MILLISECONDS); + } + + void shutdown() { + logger.info("close the lock client"); + try { + lockClient.close(); + } catch (IOException e) { + logger.error("error closing the dynamodb lock client", e); + } + + try { + final boolean isTerminated = + leaderMonitor.awaitTermination(gracefulShutdown.toMillis(), TimeUnit.MILLISECONDS); + if (!isTerminated) { + leaderMonitor.shutdownNow(); + } + } catch (InterruptedException e) { + logger.error("error timeout waiting on leader monitor to terminate executor", e); + } + masterSubject.onCompleted(); + logger.info("leader monitor shutdown"); + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void getCurrentLeader() { + logger.info("attempting leader lookup"); + final Optional optionalLock = lockClient.getLock(partitionKey, Optional.empty()); + final MasterDescription nextDescription; + if (optionalLock.isPresent()) { + final LockItem lock = optionalLock.get(); + nextDescription = lock.getData().map(this::bytesToMaster).orElse(null); + } else { + nextDescription = null; + logger.warn("no leader found"); + this.noLockPresentCounter.increment(); + } + + if (nextDescription != null) { + updateLeader(nextDescription); + } else { + this.nullNextLeaderCounter.increment(); + } + } + + private void updateLeader(MasterDescription nextDescription) { + this.refreshedLeaderCounter.increment(); + final MasterDescription prev = Optional.ofNullable(masterSubject.getValue()).orElse(MasterDescription.MASTER_NULL); + if (!prev.equals(nextDescription)) { + this.leaderChangedCounter.increment(); + logger.info("leader changer information previous {} and next {}", prev.getHostname(), nextDescription.getHostname()); + masterSubject.onNext(nextDescription); + } + } + + private MasterDescription bytesToMaster(ByteBuffer data) { + // It is possible that the underlying buffer is read more than once, + // so if the offset of the buffer is at the end, rewind, so we can read it. + if (!data.hasRemaining()) { + data.rewind(); + } + final byte[] bytes = new byte[data.remaining()]; + data.get(bytes); + try { + return jsonMapper.readValue(bytes, MasterDescription.class); + } catch (IOException e) { + logger.error("unable to parse master description bytes: {}", data, e); + this.lockDecodeFailedCounter.increment(); + } + return MasterDescription.MASTER_NULL; + } + + BehaviorSubject getMasterSubject() { + return masterSubject; + } +} diff --git a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorSingletonTest.java similarity index 82% rename from mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java rename to mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorSingletonTest.java index 5c525a07a..48b772bfc 100644 --- a/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorTest.java +++ b/mantis-control-plane/mantis-control-plane-dynamodb/src/test/java/io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitorSingletonTest.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Optional; +import java.util.concurrent.TimeUnit; import org.junit.*; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -36,7 +37,7 @@ import rx.observers.TestSubscriber; @RunWith(MockitoJUnitRunner.class) -public class DynamoDBMasterMonitorTest { +public class DynamoDBMasterMonitorSingletonTest { private static final String TABLE_NAME = "mantis-dynamodb-leader-test"; @@ -69,37 +70,39 @@ public void testAfter() throws IOException { @Test public void getCurrentLeader() throws JsonProcessingException, InterruptedException { final String lockKey = "mantis-leader"; - final DynamoDBMasterMonitor m = new DynamoDBMasterMonitor( + final DynamoDBMasterMonitorSingleton m = new DynamoDBMasterMonitorSingleton( lockSupport.getLockClient(), lockKey, DynamoDBLockSupportRule.heartbeatDuration, GRACEFUL ); TestSubscriber testSubscriber = new TestSubscriber<>(); - m.getMasterObservable().subscribe(testSubscriber); + m.getMasterSubject().subscribe(testSubscriber); m.start(); - assertEquals(MasterDescription.MASTER_NULL, m.getLatestMaster()); + assertEquals(MasterDescription.MASTER_NULL, m.getMasterSubject().getValue()); lockSupport.takeLock(lockKey, otherMaster); await() .atLeast(DynamoDBLockSupportRule.heartbeatDuration) .pollDelay(DynamoDBLockSupportRule.heartbeatDuration) .atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2)) - .untilAsserted(() -> assertEquals(otherMaster, m.getLatestMaster())); + .untilAsserted(() -> assertEquals(otherMaster, m.getMasterSubject().getValue())); lockSupport.releaseLock(lockKey); lockSupport.takeLock(lockKey, thatMaster); await() .atLeast(DynamoDBLockSupportRule.heartbeatDuration) .pollDelay(DynamoDBLockSupportRule.heartbeatDuration) .atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2)) - .untilAsserted(() -> assertEquals(m.getLatestMaster(), thatMaster)); + .untilAsserted(() -> assertEquals(m.getMasterSubject().getValue(), thatMaster)); testSubscriber.assertValues(MasterDescription.MASTER_NULL, otherMaster, thatMaster); m.shutdown(); + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertCompleted(); } @Test public void runShutdown() throws IOException { final String key = "dne"; - final DynamoDBMasterMonitor m = new DynamoDBMasterMonitor( + final DynamoDBMasterMonitorSingleton m = new DynamoDBMasterMonitorSingleton( mockLockClient, key, DynamoDBLockSupportRule.heartbeatDuration, @@ -117,14 +120,16 @@ public void runShutdown() throws IOException { @Test public void monitorDoesNotReturnNull() throws IOException, InterruptedException { final String lockKey = "mantis-leader"; - final DynamoDBMasterMonitor m = new DynamoDBMasterMonitor( + final DynamoDBMasterMonitorSingleton m = new DynamoDBMasterMonitorSingleton( lockSupport.getLockClient(), lockKey, DynamoDBLockSupportRule.heartbeatDuration, GRACEFUL ); TestSubscriber testSubscriber = new TestSubscriber<>(); - m.getMasterObservable().subscribe(testSubscriber); + m.getMasterSubject().subscribe(testSubscriber); + // ensure it's not NULL at the start + assertEquals(MasterDescription.MASTER_NULL, m.getMasterSubject().getValue()); m.start(); // Write Null @@ -133,13 +138,13 @@ public void monitorDoesNotReturnNull() throws IOException, InterruptedException .atLeast(DynamoDBLockSupportRule.heartbeatDuration) .pollDelay(DynamoDBLockSupportRule.heartbeatDuration) .atMost(Duration.ofMillis(DynamoDBLockSupportRule.heartbeatDuration.toMillis()*2)) - .untilAsserted(() -> assertEquals(MasterDescription.MASTER_NULL, m.getLatestMaster())); + .untilAsserted(() -> assertEquals(MasterDescription.MASTER_NULL, m.getMasterSubject().getValue())); lockSupport.releaseLock(lockKey); m.shutdown(); - testSubscriber.assertNoTerminalEvent(); - testSubscriber.assertNotCompleted(); + testSubscriber.awaitTerminalEvent(10, TimeUnit.SECONDS); + testSubscriber.assertCompleted(); testSubscriber.assertNoErrors(); Observable.from(testSubscriber.getOnNextEvents()) .forEach(Assert::assertNotNull); From e886a0c4a84a80272c02dc46f0a4858523e41862 Mon Sep 17 00:00:00 2001 From: sundargates Date: Mon, 6 Jan 2025 09:25:51 -0800 Subject: [PATCH 5/9] Removing myself from the codeowners (#739) --- .github/CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 47da1f9a0..c8f04389b 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @calvin681 @sundargates @Andyz26 @hmitnflx @fdc-ntflx @dtrager02 +* @calvin681 @Andyz26 @hmitnflx @fdc-ntflx @dtrager02 From 320ddcb617ff3a27144628f838a3844d12cfb7c7 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Thu, 9 Jan 2025 14:00:56 -0800 Subject: [PATCH 6/9] fix artifact update backcompat (#742) --- .../proto/JobClusterManagerProto.java | 6 ++++- .../master/jobcluster/JobClusterAkkaTest.java | 24 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java index 5dda906d9..91bf59906 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java @@ -553,7 +553,11 @@ public UpdateJobClusterArtifactRequest( this.clusterName = clusterName; this.artifactName = artifact; - this.jobJarUrl = jobJarUrl != null ? jobJarUrl : "http://" + artifact; + // [Note] in the legacy setup this artifact field is used to host the job jar url field (it maps to the + // json property "url". + this.jobJarUrl = jobJarUrl != null ? + jobJarUrl : + (artifact.startsWith("http://") ? artifact : "http://" + artifact); this.version = version; this.skipSubmit = skipSubmit; this.user = user; diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java index ae68d5d27..c04096a8f 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java @@ -789,6 +789,30 @@ public void testJobClusterMigrationConfigUpdate() throws Exception { verify(jobStoreMock, times(1)).createJobCluster(any()); } + @Test + public void testJobClusterArtifactUpdateBackCompat() throws Exception { + String clusterName = "testJobClusterArtifactUpdateBackCompat"; + UpdateJobClusterArtifactRequest req = new UpdateJobClusterArtifactRequest( + clusterName, + "http://path1/artifact1-1.zip", + null, + "1", + true, + "user"); + assertEquals("http://path1/artifact1-1.zip", req.getArtifactName()); + assertEquals("http://path1/artifact1-1.zip", req.getjobJarUrl()); + + UpdateJobClusterArtifactRequest req2 = new UpdateJobClusterArtifactRequest( + clusterName, + "artifact1-1.zip", + null, + "1", + true, + "user"); + assertEquals("artifact1-1.zip", req2.getArtifactName()); + assertEquals("http://artifact1-1.zip", req2.getjobJarUrl()); + } + @Test public void testJobClusterArtifactUpdate() throws Exception { TestKit probe = new TestKit(system); From 9ddcd1b5b903bc844aef181c2946162992d0fe29 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Thu, 9 Jan 2025 15:27:28 -0800 Subject: [PATCH 7/9] fix https jar url (#743) --- .../jobcluster/proto/JobClusterManagerProto.java | 2 +- .../mantisrx/master/jobcluster/JobClusterAkkaTest.java | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java index 91bf59906..6612b9c79 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/proto/JobClusterManagerProto.java @@ -557,7 +557,7 @@ public UpdateJobClusterArtifactRequest( // json property "url". this.jobJarUrl = jobJarUrl != null ? jobJarUrl : - (artifact.startsWith("http://") ? artifact : "http://" + artifact); + (artifact.startsWith("http://") || artifact.startsWith("https://") ? artifact : "http://" + artifact); this.version = version; this.skipSubmit = skipSubmit; this.user = user; diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java index c04096a8f..f9dad2467 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java @@ -811,6 +811,16 @@ public void testJobClusterArtifactUpdateBackCompat() throws Exception { "user"); assertEquals("artifact1-1.zip", req2.getArtifactName()); assertEquals("http://artifact1-1.zip", req2.getjobJarUrl()); + + UpdateJobClusterArtifactRequest req3 = new UpdateJobClusterArtifactRequest( + clusterName, + "https://path1/artifact1-1.zip", + null, + "1", + true, + "user"); + assertEquals("https://path1/artifact1-1.zip", req3.getArtifactName()); + assertEquals("https://path1/artifact1-1.zip", req3.getjobJarUrl()); } @Test From 292c4197c8bbfc3b74fd4feae4df96803a17c6ae Mon Sep 17 00:00:00 2001 From: crioux-stripe <115596126+crioux-stripe@users.noreply.github.com> Date: Fri, 10 Jan 2025 12:12:04 -0800 Subject: [PATCH 8/9] Add a rebatch call to RemoteObservable connections between stages. (#744) * Add a rebatch call to RemoteObservable connections between stages. * Rename property to workerClient.buffer.size --- .../remote/observable/RemoteObservable.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java b/mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java index 307ece777..61cd15566 100644 --- a/mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java +++ b/mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java @@ -45,6 +45,7 @@ import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import mantis.io.reactivex.netty.RxNetty; import mantis.io.reactivex.netty.channel.ObservableConnection; @@ -72,6 +73,10 @@ public class RemoteObservable { private static boolean enableNettyLogging = false; private static boolean enableCompression = true; private static int maxFrameLength = 5242880; // 5 MB max frame + private static int bufferSize = 0; + private static final String DEFAULT_BUFFER_SIZE_STR = "0"; + + // NJ static { @@ -106,6 +111,9 @@ private static void loadFastProperties() { if (maxFrameLengthStr != null && maxFrameLengthStr.length() > 0) { maxFrameLength = Integer.parseInt(maxFrameLengthStr); } + + String bufferSizeStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("workerClient.buffer.size", DEFAULT_BUFFER_SIZE_STR); + bufferSize = Integer.parseInt(Optional.ofNullable(bufferSizeStr).orElse(DEFAULT_BUFFER_SIZE_STR)); } private static Func1, ? extends Observable> retryLogic(final @@ -249,7 +257,8 @@ public Observable call(final ObservableConnection("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")); + .lift(new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")) + .rebatchRequests(bufferSize <= 0 ? 1 : bufferSize); } }) .doOnCompleted(new Action0() { @@ -394,7 +403,8 @@ public Observable call(final ObservableConnection("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")); + .lift(new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")) + .rebatchRequests(bufferSize <= 0 ? 1 : bufferSize); } }) .doOnCompleted(new Action0() { @@ -518,7 +528,8 @@ public Observable call(final ObservableConnection("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServer")); + .lift(new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServer")) + .rebatchRequests(bufferSize <= 0 ? 1 : bufferSize); } }) .doOnCompleted(new Action0() { From 86a09164fc52cec8d1d40e5e90ab2db4789d7733 Mon Sep 17 00:00:00 2001 From: kmg-stripe Date: Mon, 13 Jan 2025 14:20:19 -0800 Subject: [PATCH 9/9] Handle Case Where Recently Launched Worker Does Not Immediately Heartbeat (#741) * Handle Case Where Recently Launched Worker Does Not Immediately Heartbeat It seems like there is a race condition where a recently launched worker has not sent a heartbeat, the duration is still within the missed heartbeat threshold and the JobActor treats the lack of heartbeat with a resubmit. If this is true, this can lead to mass resubmits when a new leader is elected. * Update upload-artifact to v4 * Temporarily fix tests --- .github/workflows/nebula-ci.yml | 4 ++-- .../io/mantisrx/master/jobcluster/job/JobActor.java | 10 +++++++--- .../master/jobcluster/job/JobTestLifecycle.java | 4 ++-- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/.github/workflows/nebula-ci.yml b/.github/workflows/nebula-ci.yml index a101c8330..ff503a115 100644 --- a/.github/workflows/nebula-ci.yml +++ b/.github/workflows/nebula-ci.yml @@ -48,7 +48,7 @@ jobs: CI_BRANCH: ${{ github.ref }} COVERALLS_REPO_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Upload Test Results - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 if: always() with: name: Unit Test Results @@ -59,7 +59,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Upload - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: Event File path: ${{ github.event_path }} diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java index b879b8be5..1efb73b20 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/job/JobActor.java @@ -1943,9 +1943,13 @@ public void checkHeartBeats(Instant currentTime) { acceptedAt); } } else { - // no heartbeat or heartbeat too old - if (!workerMeta.getLastHeartbeatAt().isPresent() || Duration.between(workerMeta.getLastHeartbeatAt().get(), currentTime).getSeconds() - > missedHeartBeatToleranceSecs) { + // no heartbeat in a timely manner since launched or heartbeat too old + // note: the worker has been launched + boolean noTimelyHeartbeatSinceLaunched = !workerMeta.getLastHeartbeatAt().isPresent() + && Duration.between(Instant.ofEpochSecond(workerMeta.getLaunchedAt()), currentTime).getSeconds() > missedHeartBeatToleranceSecs; + boolean heartbeatTooOld = workerMeta.getLastHeartbeatAt().isPresent() + && Duration.between(workerMeta.getLastHeartbeatAt().get(), currentTime).getSeconds() > missedHeartBeatToleranceSecs; + if (noTimelyHeartbeatSinceLaunched || heartbeatTooOld) { this.numWorkerMissingHeartbeat.increment(); if (!workerMeta.getLastHeartbeatAt().isPresent()) { diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java index 31ec5c45d..1319751b1 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/JobTestLifecycle.java @@ -823,9 +823,9 @@ public void testNoHeartBeatAfterLaunchResubmit() { assertEquals(JobState.Accepted, resp4.getJobMetadata().get().getState()); // 1 original submissions and 0 resubmits because of worker not in launched state with HB timeouts - verify(schedulerMock, times(2)).scheduleWorkers(any()); + verify(schedulerMock, times(1)).scheduleWorkers(any()); // 1 kills due to resubmits - verify(schedulerMock, times(1)).unscheduleAndTerminateWorker(eq(workerId2), any()); + verify(schedulerMock, times(0)).unscheduleAndTerminateWorker(eq(workerId2), any()); } catch (Exception e) { fail("unexpected exception " + e.getMessage()); }