Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement batch schedule request #580

Merged
merged 2 commits into from
Nov 27, 2023
Merged

Implement batch schedule request #580

merged 2 commits into from
Nov 27, 2023

Conversation

fdc-ntflx
Copy link
Collaborator

@fdc-ntflx fdc-ntflx commented Nov 7, 2023

Introduction

This PR addresses the issue we've been having with large job requests and management of idle containers.
Today if a large job request (200+) nodes is to be scheduled and it used all idle containers on the RC, the scaler is not aware of any remaining pending scheduled workers and will still scale up to idle size (thus any newly added containers will directly be used by pending workers). This forces us to pre-scale ASGs manually and allocating a hefty idle worker buffer. To improve system efficiency, we've revised this mechanic to include pending worker counts in the scaler logic.

Details

In essence, the scheduling procedure shifts from per-worker to per-job. Now, once a job is submitted, it either gets all the required workers or none. For instance, if there are 20 idle workers of a certain type and a new job requires 50, we hold the request until the scaler scales up the ASG with 50 extra containers. The present 20 idle containers can then be utilized for other tasks such as job autoscaling, worker resubmission, among others.

Our solution provides several benefits:

  • No longer need to manually scale up ASG during a large submission.
  • Improved resource utilization - the idle worker buffer pool can potentially be shrunk, making the whole system more cost-effective because we should be able to scale up "on-demand" (generally, job launch latencies are not as important as autoscaling requests).
  • Improved autoscaling activity - The new approach ensures that any pending autoscaling activity is taken into account before assigning idle containers, thereby reducing the possibility of saturation.

Detailed List of Changes per Component

This update affects principal components/actors which include: job actor, scheduler actor, and rc actor. Let's delve into the detailed changes for each:

Job Actor

The Job Actor will now only handle batch requests with certain modifications. For autoscaling or worker resubmission requests, only one worker will be contained in the batch request. For job submissions requests (submitInitialWorkers function), all requested workers will be included. This also applies to all pending requests before a master leader change (initializeRunningWorkers function). Apart from these changes, the actor functions as previously.

Scheduler Actor

Initially, scheduleWorker was replaced by scheduleWorkers to manage any schedule request.

  • For autoscaling or worker resubmission (with a single worker in the request), the new scheduler uses the "old" scheduling logic.
  • When more than one worker is present (job submission requests), the scheduler will first request the RC actor to assign all workers at once. If the assignment fails, a job-id specific timer is set for retry. If successful, each individual worker's assignment is handled separately, enabling individual rescheduling if needed.

RC Actor

The RC actor now handles assignment requests for one or more workers.

  • For one worker request, nothing has changed.
  • Batch requests with more than one worker will have the actor first try to find the best fit for all the workers simultaneously. If successful, it will cache job artifacts, mark the TE state as 'assigned', and set a timer to check assignment health later.
  • In case of failure, a 'no resource available' exception will be raised. But before leveling this exception, the request is cached as pending so this information can be accounted for when the scaler requests for cluster usage.
  • A cache containing the pending requests is maintained, keyed by job id.
  • The cache is invalidated in three ways:
    1. During getting usage requests after verifying that all the workers for a pending job are now running.
    2. Via an explicit signal (necessity to be confirmed).
    3. Jobs in the cache for longer than 10 minutes are invalidated as we don't expect to take this long to fulfil such requests and even if that's the case a new request will be sent to the RC actor by the scheduler.

Scaler Actor

There are no specific changes needed for the Scaler Actor. It functions by knowing the idle count and total count for a given SKU to match the target idle count. The amendment here is that pending workers are subtracted from the current idle count and included in the usage response by the RC actor.

Tests

The new scheduler logic was tested across various scenarios, including single/multiple jobs, different skus, and autoscaling cases, even incorporating unusual and error cases such as leader failover or jobs stuck in the accepted state for long. This is the full list:

  • Use case 0: Single job with single stage (no autoscaling) - no resources available
  • Use case 1: Single job with multiple stages with same sku (no autoscaling) - only 1 sku with no resources available
  • Use case 2: Single job with multiple stages with different skus (no autoscaling) - only 1 sku with no resources available
  • Use case 3: Single job with multiple stages with different skus (no autoscaling) - 2 skus with no resources available
  • Use case 4: 2/3 jobs with single stage with same sku (no autoscaling) - 1 sku with no resources available
  • Use case 5: 2/3 jobs with multiple stages with different skus (no autoscaling) - only 1 sku with no resources available
  • Use case 6: 2/3 jobs with multiple stages with different skus (no autoscaling) - 2 skus with no resources available
  • Use case 7: Single job with multiple skus + autoscaling - only 1 sku with no resources available
  • Use case 8: 2/3 jobs with multiple skus + autoscaling - only 1 sku with no resources available
  • Use case 9: 2/3 jobs with multiple skus + autoscaling - 2 skus with no resources available
  • Use case 10: Manually autoscale single job with single stage - no resources available
  • Use case 11: Manually autoscale single job with multiple stages - no resources available
  • Use case 12: 2/3 jobs with multiple skus (no autoscaling) - autoscale 1 manually and launch 1 large to trigger no resource available
  • Error Case 1: Leader failover in the middle of successful batch request
  • Error Case 2: Leader failover in the middle of pending batch request
  • Error Case 3: Job stuck in accepted state for longer than timeout
  • Other Case: Kill job before job gets scheduled + no resources available
  • Other Case: No resource available for too long → cache expiration

Todo

  • Implement the notion of a 'minimum buffer': We propose the idea of preserving a specific number of workers that remain exclusively for operations such as autoscaling and worker replacements, as these are critical tasks. This untapped buffer of workers will ensure we always have resources ready for such high-priority operations, taking precedence over the launch of new jobs.
  • Reconsider the unscheduleJob() function: At present, this function plays a role in managing the timers in the scheduler responsible for retrying batch schedule requests, and it governs a specific corner case associated with the invalidation of the pending job cache. However, the necessity of its role could be questioned if we find a way to effectively use cache timeouts. Consequently, our mission is to devise a more proficient strategy that, ideally, capitalizes on cache timeouts.
  • Reinvent the 'cores count' attribute in the JobRequirement Class: Presently, we symbolize computational demands using 'cores count'. However, this representation may fall short of capturing the range of modifications we have in mind concerning the skus and ASGs. For instance, our current model cannot accommodate two skus with identical core counts or two ASGs running the same sku with different runtimes (which could be beneficial for testing different JDK versions). To mitigate these limitations, we propose transitioning 'cores count' to a more inclusive "machine definition interface", encompassing machine definitions along with pertinent tags/labels. This change aims at providing a broader and more nuanced understanding of job requirements.

Copy link

github-actions bot commented Nov 7, 2023

Test Results

130 files  ±0  130 suites  ±0   7m 34s ⏱️ +14s
547 tests  - 1  538 ✔️  - 2  8 💤 ±0  1 +1 
548 runs  ±0  539 ✔️  - 1  8 💤 ±0  1 +1 

For more details on these failures, see this check.

Results for commit 73c723f. ± Comparison against base commit c0c403c.

This pull request removes 1 test.
io.mantisrx.master.api.akka.route.v1.JobsRouteTest ‑ testIt

♻️ This comment has been updated with latest results.

@fdc-ntflx fdc-ntflx temporarily deployed to Integrate Pull Request November 7, 2023 03:00 — with GitHub Actions Inactive
Copy link

github-actions bot commented Nov 7, 2023

Uploaded Artifacts

To use these artifacts in your Gradle project, paste the following lines in your build.gradle.

resolutionStrategy {
    force "io.mantisrx:mantis-discovery-proto:0.1.0-20231127.183238-445"
    force "io.mantisrx:mantis-client:0.1.0-20231127.183238-446"
    force "io.mantisrx:mantis-common-serde:0.1.0-20231127.183238-445"
    force "io.mantisrx:mantis-common:0.1.0-20231127.183238-445"
    force "io.mantisrx:mantis-runtime-loader:0.1.0-20231127.183238-446"
    force "io.mantisrx:mantis-runtime:0.1.0-20231127.183238-446"
    force "io.mantisrx:mantis-remote-observable:0.1.0-20231127.183238-446"
    force "io.mantisrx:mantis-network:0.1.0-20231127.183238-445"
    force "io.mantisrx:mantis-testcontainers:0.1.0-20231127.183238-115"
    force "io.mantisrx:mantis-connector-iceberg:0.1.0-20231127.183238-444"
    force "io.mantisrx:mantis-shaded:0.1.0-20231127.183238-444"
    force "io.mantisrx:mantis-connector-publish:0.1.0-20231127.183238-445"
    force "io.mantisrx:mantis-connector-job:0.1.0-20231127.183238-446"
    force "io.mantisrx:mantis-connector-kafka:0.1.0-20231127.183238-446"
    force "io.mantisrx:mantis-control-plane-client:0.1.0-20231127.183238-445"
    force "io.mantisrx:mantis-examples-core:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-examples-groupby-sample:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-control-plane-core:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-control-plane-server:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-examples-jobconnector-sample:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-examples-sine-function:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-examples-synthetic-sourcejob:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-examples-mantis-publish-sample:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-publish-core:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-publish-netty:0.1.0-20231127.183238-438"
    force "io.mantisrx:mantis-publish-netty-guice:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-examples-wordcount:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-server-worker:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-server-agent:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-source-job-kafka:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-server-worker-client:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-examples-twitter-sample:0.1.0-20231127.183238-439"
    force "io.mantisrx:mantis-source-job-publish:0.1.0-20231127.183238-439"
}

@fdc-ntflx fdc-ntflx temporarily deployed to Integrate Pull Request November 10, 2023 21:10 — with GitHub Actions Inactive
log.info("Received batch schedule request event: {}", event);

// If the size of the batch request is 1 we'll handle it as the "old" schedule request
if (event.getRequest().getScheduleRequests().size() == 1) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason/upside to keeping the 1 node case special?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. If we're talking about actions like autoscaling or resubmitting a worker, scheduling requests will always going to involve one worker at the time. In these situations, we'd rather use the ScheduleRequestEvent lifecycle, not the BatchScheduleRequestEvent. The reason is that ScheduleRequestEvent lets us try again at the worker level. This means setting up timers for each worker (using their worker-id), instead of using timers for the entire job (with job-id) like we would in BatchScheduleRequestEvent, which will collide.

private void onFailedToBatchScheduleRequestEvent(FailedToBatchScheduleRequestEvent event) {
batchSchedulingFailures.increment();
if (event.getAttempt() >= this.maxScheduleRetries) {
log.error("Failed to submit the batch request {} because of ", event.getScheduleRequestEvent(), event.getThrowable());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case does it need to route some message/status back to the job actor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm following the behavior we currently have during scheduling ops ->

log.error("Failed to submit the request {} because of ", event.getScheduleRequestEvent(), event.getThrowable());

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we should change this behavior? If so, how?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess in this case there is no more retry/actions to this job so maybe we should mark the job as failed instead? So users won't feel that the job is still being handled.


@Override
public void unscheduleJob(String jobId) {
schedulerActor.tell(CancelBatchRequestEvent.of(jobId),null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CancelBatchRequestEvent seems to be only cancelling retrying on the scheduler side. I am wondering what happens if the rcActor is sending the assignment request while this is happening? Will the agent still get the cancel request?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the question. If you're asking what happens if the user decides to kill the job, there are 2 cases:

  1. job is in pending state because not enough resources are available. In this case we'll simply delete the timer so the scheduler won't retry.
  2. job is in accepted state and workers are already assigned. Those will still get cancelled as we're currently doing via unscheduleAndTerminateWorker.

@@ -1357,7 +1358,14 @@ private void initializeRunningWorkers() {

scheduler.initializeRunningWorker(scheduleRequest, wm.getSlave(), wm.getSlaveID());
} else if (wm.getState().equals(WorkerState.Accepted)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part a potential state mismatch source that a worker has been submitted to an agent but failed to update the worker info wm here during leader switch and it will get queued again (where the previous assigned agent is also running the same worker already)?

assertEquals(SCALE_GROUP_1,
Objects.requireNonNull(bestFitO.get().getRight().getRegistration())
Objects.requireNonNull(bestFitO.get().getBestFit().values().stream().findFirst().get().getRight().getRegistration())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have some more UTs around the pending worker set scenarios here.

@Andyz26
Copy link
Collaborator

Andyz26 commented Nov 16, 2023

lgtm thanks a lot!

@@ -317,12 +386,35 @@ public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) {
} else {
usageByGroupKey.put(groupKey, kvState);
}

if ((value.isAssigned() || value.isRunningTask()) && value.getWorkerId() != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need metrics to track the pending set size with tags.

});

GetClusterUsageResponseBuilder resBuilder = GetClusterUsageResponse.builder().clusterID(req.getClusterID());
usageByGroupKey.forEach((key, value) -> resBuilder.usage(UsageByGroupKey.builder()
.usageGroupKey(key)
.idleCount(value.getLeft())
.idleCount(value.getLeft() - pendingCountByGroupKey.get(key))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we disable the scaler during the upgrade. With this logic job scheduling might see more interruptions and do you think we need to do some handling between upgrade?

@fdc-ntflx fdc-ntflx temporarily deployed to Integrate Pull Request November 27, 2023 18:31 — with GitHub Actions Inactive
@fdc-ntflx fdc-ntflx merged commit b244630 into master Nov 27, 2023
6 of 7 checks passed
@fdc-ntflx fdc-ntflx deleted the batch-job-schedule branch November 27, 2023 18:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants