Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement batch schedule request #580

Merged
merged 2 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import lombok.Value;
Expand Down Expand Up @@ -100,14 +101,13 @@ default CompletableFuture<List<TaskExecutorID>> getUnregisteredTaskExecutors() {

/**
* Can throw {@link NoResourceAvailableException} wrapped within the CompletableFuture in case there
* are no task executors.
* are no enough available task executors.
*
* @param machineDefinition machine definition that's requested for the worker
* @param workerId worker id of the task that's going to run on the node.
* @return task executor assigned for the particular task.
* @param allocationRequests set of machine definitions requested for 1 or more workers
* @return task executors assigned for the particular request
*/
CompletableFuture<TaskExecutorID> getTaskExecutorFor(
TaskExecutorAllocationRequest allocationRequest);
CompletableFuture<Map<TaskExecutorAllocationRequest, TaskExecutorID>> getTaskExecutorsFor(
Set<TaskExecutorAllocationRequest> allocationRequests);

/**
* Returns the Gateway instance to talk to the task executor. If unable to make connection with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,7 @@ void initialize(boolean isSubmit) throws Exception {
private void initializeRunningWorkers() {
// Scan for the list of all corrupted workers to be resubmitted.
List<JobWorker> workersToResubmit = markCorruptedWorkers();
List<IMantisWorkerMetadata> workersToSubmit = new ArrayList<>();

// publish a refresh before enqueuing tasks to the Scheduler, as there is a potential race between
// WorkerRegistryV2 getting updated and isWorkerValid being called from SchedulingService loop
Expand Down Expand Up @@ -1357,7 +1358,14 @@ private void initializeRunningWorkers() {

scheduler.initializeRunningWorker(scheduleRequest, wm.getSlave(), wm.getSlaveID());
} else if (wm.getState().equals(WorkerState.Accepted)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part a potential state mismatch source that a worker has been submitted to an agent but failed to update the worker info wm here during leader switch and it will get queued again (where the previous assigned agent is also running the same worker already)?

queueTask(wm);

// If the job is in accepted state, queue all its pending workers at once in a batch request.
// This is important when before master failover there were pending batch requests
if (JobState.isAcceptedState(mantisJobMetaData.getState())) {
workersToSubmit.add(wm);
} else {
queueTask(wm);
}
}
}

Expand All @@ -1367,6 +1375,10 @@ private void initializeRunningWorkers() {
}
}

if (JobState.isAcceptedState(mantisJobMetaData.getState()) && !workersToSubmit.isEmpty()) {
queueTasks(workersToSubmit, empty());
}

// publish another update after queuing tasks to Fenzo (in case some workers were marked Started
// due to the Fake heartbeat in above loop)
markStageAssignmentsChanged(true);
Expand Down Expand Up @@ -1475,40 +1487,37 @@ private void submitInitialWorkers() throws Exception {
mantisJobMetaData.getJobDefinition(),
System.currentTimeMillis());

int beg = 0;
while (true) {
if (beg >= workers.size()) {
break;
}
int en = beg + Math.min(workerWritesBatchSize, workers.size() - beg);
final List<IMantisWorkerMetadata> workerRequests = workers.subList(beg, en);
try {
jobStore.storeNewWorkers(jobMgr.getJobDetails(), workerRequests);
LOGGER.info("Stored workers {} for Job {}", workerRequests, jobId);
// refresh Worker Registry state before enqueuing task to Scheduler
markStageAssignmentsChanged(true);
try {
jobStore.storeNewWorkers(jobMgr.getJobDetails(), workers);
LOGGER.info("Stored workers {} for Job {}", workers, jobId);
// refresh Worker Registry state before enqueuing task to Scheduler
markStageAssignmentsChanged(true);

if (!workers.isEmpty()) {
// queue to scheduler
workerRequests.forEach(this::queueTask);
} catch (Exception e) {
LOGGER.error("Error {} storing workers of job {}", e.getMessage(), jobId.getId(), e);
throw new RuntimeException("Exception saving worker for Job " + jobId, e);
queueTasks(workers, empty());
}
beg = en;
} catch (Exception e) {
LOGGER.error("Error {} storing workers of job {}", e.getMessage(), jobId.getId(), e);
throw new RuntimeException("Exception saving worker for Job " + jobId, e);
}
}

private void queueTask(final IMantisWorkerMetadata workerRequest, final Optional<Long> readyAt) {
final ScheduleRequest schedulingRequest = createSchedulingRequest(workerRequest, readyAt);
LOGGER.info("Queueing up scheduling request {} ", schedulingRequest);
private void queueTasks(final List<IMantisWorkerMetadata> workerRequests, final Optional<Long> readyAt) {
final List<ScheduleRequest> scheduleRequests = workerRequests
.stream()
.map(wR -> createSchedulingRequest(wR, readyAt))
.collect(Collectors.toList());
LOGGER.info("Queueing up batch schedule request for {} workers", workerRequests.size());
try {
scheduler.scheduleWorker(schedulingRequest);
scheduler.scheduleWorkers(new BatchScheduleRequest(scheduleRequests));
} catch (Exception e) {
LOGGER.error("Exception queueing task", e);
LOGGER.error("Exception queueing tasks", e);
}
}

private void queueTask(final IMantisWorkerMetadata workerRequest) {
queueTask(workerRequest, empty());
queueTasks(Collections.singletonList(workerRequest), empty());
}

private ScheduleRequest createSchedulingRequest(
Expand Down Expand Up @@ -1672,6 +1681,7 @@ private IMantisWorkerMetadata addWorker(SchedulingInfo schedulingInfo, int stage

@Override
public void shutdown() {
scheduler.unscheduleJob(jobId.getId());
// if workers have not already completed
if (!allWorkerCompleted()) {
// kill workers
Expand Down Expand Up @@ -1805,6 +1815,9 @@ public void checkHeartBeats(Instant currentTime) {
Instant acceptedAt = Instant.ofEpochMilli(workerMeta.getAcceptedAt());
if (Duration.between(acceptedAt, currentTime).getSeconds() > stuckInSubmitToleranceSecs) {
// worker stuck in accepted
LOGGER.info("Job {}, Worker {} stuck in accepted state for {}", this.jobMgr.getJobId(),
workerMeta.getWorkerId(), Duration.between(acceptedAt, currentTime).getSeconds());

workersToResubmit.add(worker);
eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(
WARN,
Expand Down Expand Up @@ -2016,6 +2029,7 @@ else if (currentWorkerNum < eventWorkerNum) {
if (allWorkerStarted()) {
allWorkersStarted = true;
jobMgr.onAllWorkersStarted();
scheduler.unscheduleJob(jobId.getId());
markStageAssignmentsChanged(true);
} else if (allWorkerCompleted()) {
LOGGER.info("Job {} All workers completed1", jobId);
Expand Down Expand Up @@ -2169,7 +2183,7 @@ private void resubmitWorker(JobWorker oldWorker) throws Exception {
// publish a refresh before enqueuing new Task to Scheduler
markStageAssignmentsChanged(true);
// queue the new worker for execution
queueTask(newWorker.getMetadata(), delayDuration);
queueTasks(Collections.singletonList(newWorker.getMetadata()), delayDuration);
LOGGER.info("Worker {} successfully queued for scheduling", newWorker);
numWorkerResubmissions.increment();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,21 @@ public static boolean isRunningState(JobState state) {
return false;
}
}

/**
* Returns true if the job is accepted.
*
* @param state
*
* @return
*/
public static boolean isAcceptedState(JobState state) {
switch (state) {
case Accepted:

return true;
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package io.mantisrx.master.resourcecluster;

import io.mantisrx.master.resourcecluster.ResourceClusterActor.BestFit;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetActiveJobsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetClusterUsageRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorBatchAssignmentRequest;
import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesRequest;
import io.mantisrx.master.resourcecluster.proto.GetClusterUsageResponse;
import io.mantisrx.server.master.resourcecluster.ResourceCluster.ResourceOverview;
Expand All @@ -29,7 +30,6 @@
import java.util.Set;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;

/**
* A component to manage the states of {@link TaskExecutorState} for a given {@link ResourceClusterActor}.
Expand Down Expand Up @@ -79,11 +79,12 @@ Optional<Entry<TaskExecutorID, TaskExecutorState>> findFirst(
Predicate<Entry<TaskExecutorID, TaskExecutorState>> predicate);

/**
* Find a matched task executor best fitting the given assignment request.
* Finds set of matched task executors best fitting the given assignment requests.
*
* @param request Assignment request.
* @return Optional of matched task executor.
* @return Optional of matched task executors.
*/
Optional<Pair<TaskExecutorID, TaskExecutorState>> findBestFit(TaskExecutorAssignmentRequest request);
Optional<BestFit> findBestFit(TaskExecutorBatchAssignmentRequest request);

Set<Entry<TaskExecutorID, TaskExecutorState>> getActiveExecutorEntry();

Expand Down
Loading
Loading