From bd6c6ba0dcbc70ee8782dd09c64253b2578f41f8 Mon Sep 17 00:00:00 2001 From: Thorarinn Sigurdsson Date: Wed, 18 Dec 2024 18:54:38 +0100 Subject: [PATCH] perf: optimise solver graph evaluation loop (#6728) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This fix makes the `loop`  method in the solver much more efficient for large graphs (lots of actions and dependencies). While graph evaluation in the solver isn't usually the performance bottleneck, the performance impact can be noticeable in larger projects. Here, we use a generator to get leaf nodes ready for processing (whereas previously, we'd traverse the whole pending graph on each loop iteration and load it node-by-node into a new dependency graph data structure, which was costly when iterating over a large graph in a tight loop). We also used a simple boolean flag (dirty) in the solver to track whether any tasks had been requested or completed (successfully or with an error) since the last loop. This was used to avoid evaluating the graph when we know it's not going to result in any changes to the in-progress or pending graphs, or in any new nodes being scheduled for processing. There are further optimizations we can make to the solver (see the comments added in this commit), but they would require careful implementation and testing to avoid possible regressions, so we're leaving them be for now. In a test project that was specifically designed to get the solver to struggle (lots of actions with lots of dependencies), fully resolving the graph before this change took 3 minutes on my laptop, whereas it's down to 55 seconds after these changes. I suspect it could be brought down to 20-30 seconds with the further optimizations outlined in the added code comments, but this might be enough for the time being. Co-authored-by: Steffen Neubauer --- core/src/graph/solver.ts | 111 ++++-- core/test/unit/src/graph/solver.ts | 604 +++-------------------------- 2 files changed, 123 insertions(+), 592 deletions(-) diff --git a/core/src/graph/solver.ts b/core/src/graph/solver.ts index 709cd1c1f5..a3007ede11 100644 --- a/core/src/graph/solver.ts +++ b/core/src/graph/solver.ts @@ -11,10 +11,9 @@ import type { Log } from "../logger/log-entry.js" import type { GardenError, GardenErrorParams } from "../exceptions.js" import { GraphError, toGardenError } from "../exceptions.js" import { uuidv4 } from "../util/random.js" -import { DependencyGraph } from "./common.js" import { Profile } from "../util/profiling.js" import { TypedEventEmitter } from "../util/events.js" -import { groupBy, keyBy } from "lodash-es" +import { keyBy } from "lodash-es" import type { GraphResult, TaskEventBase } from "./results.js" import { GraphResults, resultToString } from "./results.js" import { gardenEnv } from "../constants.js" @@ -53,6 +52,7 @@ export class GraphSolver extends TypedEventEmitter { // Tasks currently running private readonly inProgress: WrappedNodes + private dirty: boolean private inLoop: boolean private log: Log @@ -68,6 +68,7 @@ export class GraphSolver extends TypedEventEmitter { this.log = garden.log.createLog({ name: "graph-solver" }) this.inLoop = false + this.dirty = false this.requestedTasks = {} this.nodes = {} this.pendingNodes = {} @@ -75,12 +76,12 @@ export class GraphSolver extends TypedEventEmitter { this.lock = new AsyncLock() this.on("start", () => { - this.log.silly(`GraphSolver: start`) + this.log.silly(() => `GraphSolver: start`) this.emit("loop", {}) }) this.on("loop", () => { - this.log.silly(`GraphSolver: loop`) + this.log.silly(() => `GraphSolver: loop`) this.loop() }) } @@ -214,31 +215,34 @@ export class GraphSolver extends TypedEventEmitter { }) } - private getPendingGraph() { + private *getPendingLeaves(): Generator { const nodes = Object.values(this.pendingNodes) - const graph = new DependencyGraph() + const visitedKeys = new Set() - const addNode = (node: TaskNode) => { + function* addNode(node: TaskNode) { const key = node.getKey() - if (node.isComplete()) { + if (node.isComplete() || visitedKeys.has(key)) { return } - graph.addNode(key, node) + visitedKeys.add(key) + // TODO: We could optimize further by making this method a generator too. const deps = node.getRemainingDependencies() + if (deps.length === 0) { + // Leaf node found + yield node + return + } for (const dep of deps) { - addNode(dep) - graph.addDependency(key, dep.getKey()) + yield* addNode(dep) } } for (const node of nodes) { - addNode(node) + yield* addNode(node) } - - return graph } start() { @@ -286,6 +290,12 @@ export class GraphSolver extends TypedEventEmitter { if (this.inLoop) { return } + if (!this.dirty) { + // The graph becomes dirty when a task is requested or completed: This means that either a new task node + // may have been added, or that a new result has been set on a node (which will affect the next graph + // evaluation). + return + } this.inLoop = true try { @@ -299,36 +309,56 @@ export class GraphSolver extends TypedEventEmitter { } } - const graph = this.getPendingGraph() + const leafGenerator = this.getPendingLeaves() + let leafCount = 0 - if (graph.size() === 0) { - return - } + const inProgressNodes = Object.values(this.inProgress) - const leaves = graph.overallOrder(true) - const pending = leaves.map((key) => this.nodes[key]) + // Enforce concurrency limits per task type and concurrency group key + const pendingConcurrencyGroupCapacitites: { [key: string]: number } = {} - const inProgressNodes = Object.values(this.inProgress) - const inProgressByGroup = groupBy(inProgressNodes, "type") - - // Enforce concurrency limits per task type - const grouped = groupBy(pending, (n) => n.concurrencyGroupKey) - const limitedByGroup = Object.values(grouped).flatMap((nodes) => { - // Note: We can be sure there is at least one node in the array - const groupLimit = nodes[0].concurrencyLimit - const inProgress = inProgressByGroup[nodes[0].type] || [] - return nodes.slice(0, groupLimit - inProgress.length) - }) + this.dirty = false + + // We could do this with a `groupBy`, but this is more efficient (and the loop method is run frequently). + for (const node of inProgressNodes) { + const groupKey = node.concurrencyGroupKey + if (!pendingConcurrencyGroupCapacitites[groupKey]) { + pendingConcurrencyGroupCapacitites[groupKey] = 0 + } + pendingConcurrencyGroupCapacitites[groupKey]++ + } + const leavesLimitedByGroup: TaskNode[] = [] + for (const node of leafGenerator) { + if (leafCount >= this.hardConcurrencyLimit - inProgressNodes.length) { + // Enforce hard global limit. Note that we never get more leaves than this from `leafGenerator`, which can + // save on compute in big graphs. + break + } + leafCount++ + const groupKey = node.concurrencyGroupKey + // Note: All nodes within a given concurrency group should have the same limit. + const groupLimit = node.concurrencyLimit + if (!pendingConcurrencyGroupCapacitites[groupKey]) { + pendingConcurrencyGroupCapacitites[groupKey] = 0 + } + if (pendingConcurrencyGroupCapacitites[groupKey] >= groupLimit) { + // We've already reached the concurrency limit for this group, so we won't schedule this node now. + continue + } + // There's capacity available for this group, so we schedule the node + leavesLimitedByGroup.push(node) + pendingConcurrencyGroupCapacitites[groupKey]++ + } + if (leafCount === 0) { + return + } - if (limitedByGroup.length === 0) { + if (leavesLimitedByGroup.length === 0) { this.emit("loop", {}) return } - // Enforce hard global limit - const nodesToProcess = limitedByGroup - .slice(0, this.hardConcurrencyLimit - inProgressNodes.length) - .filter((node) => !this.inProgress[node.getKey()]) + const nodesToProcess = leavesLimitedByGroup.filter((node) => !this.inProgress[node.getKey()]) if (nodesToProcess.length === 0) { this.emit("loop", {}) @@ -370,6 +400,7 @@ export class GraphSolver extends TypedEventEmitter { * Processes a single task to completion, handling errors and providing its result to in-progress task batches. */ private async processNode(node: TaskNode, startedAt: Date) { + this.dirty = true // Check for missing dependencies by calculating the input version so we can handle the exception // as a user error before getting deeper into the control flow (where it would result in an internal // error with a noisy stack trace). @@ -381,6 +412,8 @@ export class GraphSolver extends TypedEventEmitter { } this.log.silly(() => `Processing node ${taskStyle(node.getKey())}`) + // TODO-performance: Record that a result or an error has become available for this node, use for looping + // in evaluateRequests. try { const processResult = await node.execute() @@ -394,6 +427,7 @@ export class GraphSolver extends TypedEventEmitter { } private requestTask(params: TaskRequestParams) { + this.dirty = true const request = new RequestTaskNode(params) if (!this.requestedTasks[params.batchId]) { this.requestedTasks[params.batchId] = {} @@ -403,6 +437,7 @@ export class GraphSolver extends TypedEventEmitter { } private evaluateRequests() { + // TODO-performance: Only iterate over requests with new results since last loop for (const [_batchId, requests] of Object.entries(this.requestedTasks)) { for (const request of Object.values(requests)) { if (request.isComplete()) { @@ -430,6 +465,8 @@ export class GraphSolver extends TypedEventEmitter { this.log.silly(() => `Request ${request.getKey()} has ready status and force=false, no need to process.`) this.completeTask({ ...status, node: request }) } else { + // TODO-performance: Add processing nodes for requests only once, during the solve call before looping. + // We create exactly one request node for each requested task, so this is known up front. const processNode = this.getNode({ type: "process", task, statusOnly: request.statusOnly }) const result = this.getPendingResult(processNode) @@ -452,6 +489,7 @@ export class GraphSolver extends TypedEventEmitter { } private ensurePendingNode(node: TaskNode, dependant: TaskNode) { + this.dirty = true const key = node.getKey() const existing = this.pendingNodes[key] @@ -465,6 +503,7 @@ export class GraphSolver extends TypedEventEmitter { } private completeTask(params: CompleteTaskParams & { node: TaskNode }) { + this.dirty = true const node = params.node const result = node.complete(params) delete this.inProgress[node.getKey()] diff --git a/core/test/unit/src/graph/solver.ts b/core/test/unit/src/graph/solver.ts index 82cbcf56d0..c68f1d2067 100644 --- a/core/test/unit/src/graph/solver.ts +++ b/core/test/unit/src/graph/solver.ts @@ -15,6 +15,7 @@ import type { MakeOptional } from "../../../../src/util/util.js" import type { SolveOpts } from "../../../../src/graph/solver.js" import type { ActionState } from "../../../../src/actions/types.js" import { GardenError, GenericGardenError } from "../../../../src/exceptions.js" +import { range } from "lodash-es" const projectRoot = getDataDir("test-project-empty") @@ -28,6 +29,8 @@ interface TestTaskParams extends CommonTaskParams { statusCallback?: TestTaskCallback dependencies?: BaseTask[] statusDependencies?: BaseTask[] + statusConcurrencyLimit?: number + executeConcurrencyLimit?: number } interface TestTaskResult extends ValidResultType { @@ -41,9 +44,12 @@ interface TestTaskResult extends ValidResultType { // TODO-G2: Implement equivalent test cases for the new graph +const defaultStatusConcurrencyLimit = 10 +const defaultExecuteConcurrencyLimit = 10 + export class TestTask extends BaseTask { - override readonly statusConcurrencyLimit = 10 - override readonly executeConcurrencyLimit = 10 + override statusConcurrencyLimit = defaultStatusConcurrencyLimit + override executeConcurrencyLimit = defaultExecuteConcurrencyLimit readonly type = "test" @@ -61,6 +67,8 @@ export class TestTask extends BaseTask { this.callback = params.callback || null this.dependencies = params.dependencies || [] this.statusDependencies = params.statusDependencies || [] + this.statusConcurrencyLimit = params.statusConcurrencyLimit || defaultStatusConcurrencyLimit + this.executeConcurrencyLimit = params.executeConcurrencyLimit || defaultExecuteConcurrencyLimit } resolveStatusDependencies() { @@ -352,558 +360,42 @@ describe("GraphSolver", () => { expect(depResults!.dependencyResults?.["test.task-a"]?.outputs["processed"]).to.equal(false) }) - // TODO-G2: update these once we're decided on the event formats - - // it("should emit a taskPending event when adding a task", async () => { - // const now = freezeTime() - - // const garden = await getGarden() - // const graph = new TaskGraph(garden, garden.log) - // const task = new TestTask(garden, "a", false) - - // const result = await graph.process([task]) - // const generatedBatchId = result?.a?.batchId || uuidv4() - - // expect(garden.events.eventLog).to.eql([ - // { name: "taskGraphProcessing", payload: { startedAt: now } }, - // { - // name: "taskPending", - // payload: { - // addedAt: now, - // batchId: generatedBatchId, - // key: task.getBaseKey(), - // name: task.name, - // type: task.type, - // }, - // }, - // { - // name: "taskProcessing", - // payload: { - // startedAt: now, - // batchId: generatedBatchId, - // key: task.getBaseKey(), - // name: task.name, - // type: task.type, - // versionString: task.version, - // }, - // }, - // { name: "taskComplete", payload: toGraphResultEventPayload(result["a"]!) }, - // { name: "taskGraphComplete", payload: { completedAt: now } }, - // ]) - // }) - - // it("should throw if tasks have circular dependencies", async () => { - // const garden = await getGarden() - // const graph = new TaskGraph(garden, garden.log) - // const taskA = new TestTask(garden, "a", false) - // const taskB = new TestTask(garden, "b", false, { dependencies: [taskA] }) - // const taskC = new TestTask(garden, "c", false, { dependencies: [taskB] }) - // taskA["dependencies"] = [taskC] - // const errorMsg = "Circular task dependencies detected:\n\nb <- a <- c <- b\n" - - // await expectError( - // () => graph.process([taskB]), - // (err) => expect(err.message).to.eql(errorMsg) - // ) - // }) - - // it("should emit events when processing and completing a task", async () => { - // const now = freezeTime() - - // const garden = await getGarden() - // const graph = new TaskGraph(garden, garden.log) - // const task = new TestTask(garden, "a", false) - // await graph.process([task]) - - // garden.events.eventLog = [] - - // // repeatedTask has the same key and version as task, so its result is already cached - // const repeatedTask = new TestTask(garden, "a", false) - // const results = await graph.process([repeatedTask]) - - // expect(garden.events.eventLog).to.eql([ - // { name: "taskGraphProcessing", payload: { startedAt: now } }, - // { - // name: "taskComplete", - // payload: toGraphResultEventPayload(results["a"]!), - // }, - // { name: "taskGraphComplete", payload: { completedAt: now } }, - // ]) - // }) - - // it("should emit a taskError event when failing a task", async () => { - // const now = freezeTime() - - // const garden = await getGarden() - // const graph = new TaskGraph(garden, garden.log) - // const task = new TestTask(garden, "a", false, { throwError: true }) - - // const result = await graph.process([task]) - // const generatedBatchId = result?.a?.batchId || uuidv4() - - // expect(garden.events.eventLog).to.eql([ - // { name: "taskGraphProcessing", payload: { startedAt: now } }, - // { - // name: "taskPending", - // payload: { - // addedAt: now, - // batchId: generatedBatchId, - // key: task.getBaseKey(), - // name: task.name, - // type: task.type, - // }, - // }, - // { - // name: "taskProcessing", - // payload: { - // startedAt: now, - // batchId: generatedBatchId, - // key: task.getBaseKey(), - // name: task.name, - // type: task.type, - // versionString: task.version, - // }, - // }, - // { name: "taskError", payload: sanitizeValue(result["a"]) }, - // { name: "taskGraphComplete", payload: { completedAt: now } }, - // ]) - // }) - - // it("should have error property inside taskError event when failing a task", async () => { - // freezeTime() - - // const garden = await getGarden() - // const graph = new TaskGraph(garden, garden.log) - // const task = new TestTask(garden, "a", false, { throwError: true }) - - // await graph.process([task]) - // const taskError = garden.events.eventLog.find((obj) => obj.name === "taskError") - - // expect(taskError && taskError.payload["error"]).to.exist - // }) - - // it("should throw on task error if throwOnError is set", async () => { - // const garden = await getGarden() - // const graph = new TaskGraph(garden, garden.log) - // const task = new TestTask(garden, "a", false, { throwError: true }) - - // await expectError( - // () => graph.process([task], { throwOnError: true }), - // (err) => expect(err.message).to.include("action(s) failed") - // ) - // }) - - // it("should include any task errors in task results", async () => { - // const garden = await getGarden() - // const graph = new TaskGraph(garden, garden.log) - // const taskA = new TestTask(garden, "a", false, { throwError: true }) - // const taskB = new TestTask(garden, "b", false, { throwError: true }) - // const taskC = new TestTask(garden, "c", false) - - // const results = await graph.process([taskA, taskB, taskC]) - - // expect(results.a!.error).to.exist - // expect(results.b!.error).to.exist - // expect(results.c!.error).to.not.exist - // }) - - // it("should process multiple tasks in dependency order", async () => { - // const now = freezeTime() - // const garden = await getGarden() - // const graph = new TaskGraph(garden, garden.log) - - // const callbackResults = {} - // const resultOrder: string[] = [] - - // const callback = async (key: string, result: any) => { - // resultOrder.push(key) - // callbackResults[key] = result - // } - - // const opts = { callback } - - // const taskA = new TestTask(garden, "a", false, { ...opts, dependencies: [], uid: "a1" }) - // const taskB = new TestTask(garden, "b", false, { ...opts, dependencies: [taskA], uid: "b1" }) - // const taskC = new TestTask(garden, "c", false, { ...opts, dependencies: [taskB], uid: "c1" }) - // const taskD = new TestTask(garden, "d", false, { ...opts, dependencies: [taskB, taskC], uid: "d1" }) - - // // we should be able to add tasks multiple times and in any order - // const results = await graph.process([taskA, taskB, taskC, taskC, taskD, taskA, taskD, taskB, taskD, taskA]) - // const generatedBatchId = results?.a?.batchId || uuidv4() - - // // repeat - - // const repeatCallbackResults = {} - // const repeatResultOrder: string[] = [] - - // const repeatCallback = async (key: string, result: any) => { - // repeatResultOrder.push(key) - // repeatCallbackResults[key] = result - // } - - // const repeatOpts = { callback: repeatCallback } - - // const repeatTaskA = new TestTask(garden, "a", false, { ...repeatOpts, dependencies: [], uid: "a2" }) - // const repeatTaskB = new TestTask(garden, "b", false, { ...repeatOpts, dependencies: [repeatTaskA], uid: "b2" }) - // const repeatTaskC = new TestTask(garden, "c", true, { ...repeatOpts, dependencies: [repeatTaskB], uid: "c2" }) - - // const repeatTaskAforced = new TestTask(garden, "a", true, { ...repeatOpts, dependencies: [], uid: "a2f" }) - // const repeatTaskBforced = new TestTask(garden, "b", true, { - // ...repeatOpts, - // dependencies: [repeatTaskA], - // uid: "b2f", - // }) - - // await graph.process([repeatTaskBforced, repeatTaskAforced, repeatTaskC]) - - // const resultA: GraphResult = { - // type: "test", - // description: "a.a1", - // key: "a", - // name: "a", - // startedAt: now, - // completedAt: now, - // batchId: generatedBatchId, - // result: { - // result: "result-a.a1", - // dependencyResults: {}, - // }, - // dependencyResults: {}, - // version: taskA.version, - // } - // const resultB: GraphResult = { - // type: "test", - // key: "b", - // name: "b", - // description: "b.b1", - // startedAt: now, - // completedAt: now, - // batchId: generatedBatchId, - // result: { - // result: "result-b.b1", - // dependencyResults: { a: resultA }, - // }, - // dependencyResults: { a: resultA }, - // version: taskB.version, - // } - // const resultC: GraphResult = { - // type: "test", - // description: "c.c1", - // key: "c", - // name: "c", - // startedAt: now, - // completedAt: now, - // batchId: generatedBatchId, - // result: { - // result: "result-c.c1", - // dependencyResults: { b: resultB }, - // }, - // dependencyResults: { b: resultB }, - // version: taskC.version, - // } - - // const expected: GraphResults = { - // a: resultA, - // b: resultB, - // c: resultC, - // d: { - // type: "test", - // description: "d.d1", - // key: "d", - // name: "d", - // startedAt: now, - // completedAt: now, - // batchId: generatedBatchId, - // result: { - // result: "result-d.d1", - // dependencyResults: { - // b: resultB, - // c: resultC, - // }, - // }, - // dependencyResults: { - // b: resultB, - // c: resultC, - // }, - // version: taskD.version, - // }, - // } - - // expect(results).to.eql(expected, "Wrong results after initial add and process") - // expect(resultOrder).to.eql(["a.a1", "b.b1", "c.c1", "d.d1"], "Wrong result order after initial add and process") - - // expect(callbackResults).to.eql( - // { - // "a.a1": "result-a.a1", - // "b.b1": "result-b.b1", - // "c.c1": "result-c.c1", - // "d.d1": "result-d.d1", - // }, - // "Wrong callbackResults after initial add and process" - // ) - - // expect(repeatResultOrder).to.eql(["a.a2f", "b.b2f", "c.c2"], "Wrong result order after repeat add & process") - - // expect(repeatCallbackResults).to.eql( - // { - // "a.a2f": "result-a.a2f", - // "b.b2f": "result-b.b2f", - // "c.c2": "result-c.c2", - // }, - // "Wrong callbackResults after repeat add & process" - // ) - // }) - - // it("should add at most one pending task for a given key", async () => { - // const garden = await getGarden() - // const graph = new TaskGraph(garden, garden.log) - - // const processedVersions: string[] = [] - - // const { promise: t1StartedPromise, resolver: t1StartedResolver } = defer() - // const { promise: t1DonePromise, resolver: t1DoneResolver } = defer() - - // const t1 = new TestTask(garden, "a", false, { - // versionString: "1", - // uid: "1", - // callback: async () => { - // t1StartedResolver() - // processedVersions.push("1") - // await t1DonePromise - // }, - // }) - - // const repeatedCallback = (version: string) => { - // return async () => { - // processedVersions.push(version) - // } - // } - // const t2 = new TestTask(garden, "a", false, { uid: "2", versionString: "2", callback: repeatedCallback("2") }) - // const t3 = new TestTask(garden, "a", false, { uid: "3", versionString: "3", callback: repeatedCallback("3") }) - - // const firstProcess = graph.process([t1]) - - // // We make sure t1 is being processed before adding t2 and t3. Since t3 is added after t2, - // // only t1 and t3 should be processed (since t2 and t3 have the same key, "a"). - // await t1StartedPromise - // const secondProcess = graph.process([t2]) - // const thirdProcess = graph.process([t3]) - // await sleep(200) // TODO: Get rid of this? - // t1DoneResolver() - // await Promise.all([firstProcess, secondProcess, thirdProcess]) - // expect(processedVersions).to.eql(["1", "3"]) - // }) - - // TODO-G2: not implemented - // it("should process requests with unrelated tasks concurrently", async () => { - // const garden = await getGarden() - // const graph = new TaskGraph(garden, garden.log) - - // const resultOrder: string[] = [] - - // const callback = async (key: string) => { - // resultOrder.push(key) - // } - - // const { resolver: aStartedResolver } = defer() - // const { promise: aDonePromise, resolver: aDoneResolver } = defer() - - // const opts = { callback } - // const taskADep1 = new TestTask(garden, "a-dep1", false, { ...opts }) - // const taskADep2 = new TestTask(garden, "a-dep2", false, { ...opts }) - - // const taskA = new TestTask(garden, "a", false, { - // dependencies: [taskADep1, taskADep2], - // callback: async () => { - // aStartedResolver() - // resultOrder.push("a") - // await aDonePromise - // }, - // }) - - // const taskBDep = new TestTask(garden, "b-dep", false, { ...opts }) - // const taskB = new TestTask(garden, "b", false, { ...opts, dependencies: [taskBDep] }) - // const taskC = new TestTask(garden, "c", false, { ...opts }) - - // const firstProcess = graph.process([taskA, taskADep1, taskADep2]) - // const secondProcess = graph.process([taskB, taskBDep]) - // const thirdProcess = graph.process([taskC]) - // aDoneResolver() - // await Promise.all([firstProcess, secondProcess, thirdProcess]) - // expect(resultOrder).to.eql(["c", "a-dep1", "a-dep2", "b-dep", "a", "b"]) - // }) - - // it("should process two requests with related tasks sequentially", async () => { - // const garden = await getGarden() - // const graph = new TaskGraph(garden, garden.log) - - // const resultOrder: string[] = [] - - // const callback = async (key: string) => { - // resultOrder.push(key) - // } - - // const { resolver: aStartedResolver } = defer() - // const { promise: aDonePromise, resolver: aDoneResolver } = defer() - - // const opts = { callback } - // const taskADep = new TestTask(garden, "a-dep1", true, { ...opts }) - - // const taskA = new TestTask(garden, "a", true, { - // dependencies: [taskADep], - // callback: async () => { - // aStartedResolver() - // resultOrder.push("a") - // await aDonePromise - // }, - // }) - - // const repeatTaskBDep = new TestTask(garden, "b-dep", true, { ...opts }) - - // const firstProcess = graph.process([taskA, taskADep]) - // const secondProcess = graph.process([repeatTaskBDep]) - // aDoneResolver() - // await Promise.all([firstProcess, secondProcess]) - // expect(resultOrder).to.eql(["b-dep", "a-dep1", "a"]) - // }) - - // it("should enforce a hard concurrency limit on task processing", async () => { - // const garden = await getGarden() - // const tasks = range(0, 10).map((n) => new TestTask(garden, "task-" + n, false)) - // const limit = 3 - // const graph = new TaskGraph(garden, garden.log, limit) - // let gotEvents = false - - // graph.on("process", (event) => { - // gotEvents = true - // // Ensure we never go over the hard limit - // expect(event.keys.length + event.inProgress.length).to.lte(limit) - // }) - - // await graph.process(tasks) - - // expect(gotEvents).to.be.true - // }) - - // it("should enforce a concurrency limit per task type", async () => { - // const garden = await getGarden() - // const limit = 2 - - // class TaskTypeA extends TestTask { - // type = "a" - // concurrencyLimit = limit - // } - - // class TaskTypeB extends TestTask { - // type = "b" - // concurrencyLimit = limit - // } - - // const tasks = [ - // ...range(0, 10).map((n) => new TaskTypeA(garden, "a-" + n, false)), - // ...range(0, 10).map((n) => new TaskTypeB(garden, "b-" + n, false)), - // ] - - // const graph = new TaskGraph(garden, garden.log) - - // let gotEvents = false - - // graph.on("process", (event) => { - // gotEvents = true - // // Ensure not more than two of each task type run concurrently - // for (const type of ["a", "b"]) { - // const keys = [...event.keys, ...event.inProgress].filter((key) => key.startsWith(type)) - // expect(keys.length).to.lte(limit) - // } - // }) - - // await graph.process(tasks) - - // expect(gotEvents).to.be.true - // }) - - // it("should recursively cancel a task's dependants when it throws an error", async () => { - // const now = freezeTime() - // const garden = await getGarden() - // const graph = new TaskGraph(garden, garden.log) - - // const resultOrder: string[] = [] - - // const callback = async (key: string) => { - // resultOrder.push(key) - // } - - // const opts = { callback } - - // const taskA = new TestTask(garden, "a", true, { ...opts }) - // const taskB = new TestTask(garden, "b", true, { callback, throwError: true, dependencies: [taskA] }) - // const taskC = new TestTask(garden, "c", true, { ...opts, dependencies: [taskB] }) - // const taskD = new TestTask(garden, "d", true, { ...opts, dependencies: [taskB, taskC] }) - - // const results = await graph.process([taskA, taskB, taskC, taskD]) - - // const generatedBatchId = results?.a?.batchId || uuidv4() - - // const resultA: GraphResult = { - // type: "test", - // description: "a", - // key: "a", - // name: "a", - // startedAt: now, - // completedAt: now, - // batchId: generatedBatchId, - // result: { - // result: "result-a", - // dependencyResults: {}, - // }, - // dependencyResults: {}, - // version: taskA.version, - // } - - // const filteredKeys: Set = new Set([ - // "version", - // "versionString", - // "error", - // "addedAt", - // "startedAt", - // "cancelledAt", - // "completedAt", - // ]) - - // const filteredEventLog = garden.events.eventLog.map((e) => { - // return deepFilter(e, (_, key) => !filteredKeys.has(key)) - // }) - - // expect(results.a).to.eql(resultA) - // expect(results.b).to.have.property("error") - // expect(resultOrder).to.eql(["a", "b"]) - // expect(filteredEventLog).to.eql([ - // { name: "taskGraphProcessing", payload: {} }, - // { name: "taskPending", payload: { key: "a", name: "a", type: "test", batchId: generatedBatchId } }, - // { name: "taskPending", payload: { key: "b", name: "b", type: "test", batchId: generatedBatchId } }, - // { name: "taskPending", payload: { key: "c", name: "c", type: "test", batchId: generatedBatchId } }, - // { name: "taskPending", payload: { key: "d", name: "d", type: "test", batchId: generatedBatchId } }, - // { name: "taskProcessing", payload: { key: "a", name: "a", type: "test", batchId: generatedBatchId } }, - // { - // name: "taskComplete", - // payload: { - // description: "a", - // key: "a", - // name: "a", - // type: "test", - // batchId: generatedBatchId, - // output: { result: "result-a" }, - // }, - // }, - // { name: "taskProcessing", payload: { key: "b", name: "b", type: "test", batchId: generatedBatchId } }, - // { - // name: "taskError", - // payload: { description: "b", key: "b", name: "b", type: "test", batchId: generatedBatchId }, - // }, - // { name: "taskCancelled", payload: { key: "c", name: "c", type: "test", batchId: generatedBatchId } }, - // { name: "taskCancelled", payload: { key: "d", name: "d", type: "test", batchId: generatedBatchId } }, - // { name: "taskCancelled", payload: { key: "d", name: "d", type: "test", batchId: generatedBatchId } }, - // { name: "taskGraphComplete", payload: {} }, - // ]) - // }) + it("respects the concurrency limit specified by each node", async () => { + // We use different status & execute limits for the two groups of tasks here. + const statusLimitA = 2 + const executeLimitA = 1 + const statusLimitB = 3 + const executeLimitB = 2 + const groupATasks = range(0, 3).map((n) => + makeTask({ name: `task-a-${n}`, statusConcurrencyLimit: statusLimitA, executeConcurrencyLimit: executeLimitA }) + ) + const groupBTasks = range(0, 3).map((n) => + makeTask({ name: `task-b-${n}`, statusConcurrencyLimit: statusLimitB, executeConcurrencyLimit: executeLimitB }) + ) + + const processedBatches: string[][] = [] + garden["solver"].on("process", (event) => { + processedBatches.push(event.keys) + }) + + await garden.processTasks({ tasks: [...groupATasks, ...groupBTasks], throwOnError: false }) + + expect(processedBatches[0].sort()).to.eql( + [ + "test.task-a-0:status", + "test.task-a-1:status", + "test.task-b-0:status", + "test.task-b-1:status", + "test.task-b-2:status", + ].sort() + ) + + expect(processedBatches[1].sort()).to.eql( + ["test.task-a-2:status", "test.task-a-0:process", "test.task-b-0:process", "test.task-b-1:process"].sort() + ) + + expect(processedBatches[2].sort()).to.eql(["test.task-a-1:process", "test.task-b-2:process"]) + + expect(processedBatches[3].sort()).to.eql(["test.task-a-2:process"]) + }) })