Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Custom Balancer #590

Merged
merged 98 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 88 commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
2cf4b46
refactor: emit `drain` as soon as capacity is available (#367)
metcoder95 Jul 14, 2023
7b86105
refactor!: drop runTask method (#363)
metcoder95 Sep 21, 2023
b42bb14
chore(deps-dev): Bump @types/node from 20.6.0 to 20.6.2 (#406)
dependabot[bot] Sep 19, 2023
4df0ca4
fix: add signal reason support (#403)
metcoder95 Sep 20, 2023
986659a
chore(deps): Bump actions/checkout from 3 to 4 (#413)
dependabot[bot] Oct 2, 2023
bf92e5a
chore(deps-dev): Bump @types/node from 20.6.2 to 20.8.2 (#417)
dependabot[bot] Oct 2, 2023
352bc84
chore(deps-dev): Bump @types/node from 20.8.2 to 20.8.3 (#419)
dependabot[bot] Oct 15, 2023
a998a69
chore(deps): Bump @babel/traverse (#425)
dependabot[bot] Oct 20, 2023
31f7a2b
fix: do not re-create threads when calling `.destory()` (#430)
alan-agius4 Oct 20, 2023
3d67926
chore(deps-dev): Bump @types/node from 20.8.3 to 20.8.7 (#428)
dependabot[bot] Oct 23, 2023
5c2264c
fix: migrate to EventEmitterAsyncResource from core (#433)
groozin Oct 30, 2023
69cae53
chore(deps): Bump actions/setup-node from 3 to 4 (#437)
dependabot[bot] Nov 6, 2023
0cfa8d7
chore(deps-dev): Bump @types/node from 20.8.7 to 20.8.10 (#440)
dependabot[bot] Nov 6, 2023
c9e2abc
chore(deps-dev): Bump @typescript-eslint/parser from 6.9.1 to 6.10.0 …
dependabot[bot] Nov 13, 2023
ef6e205
chore(deps-dev): Bump @types/node from 20.8.10 to 20.9.0 (#443)
dependabot[bot] Nov 13, 2023
1f85c16
chore(deps-dev): Bump @typescript-eslint/eslint-plugin from 6.9.1 to …
dependabot[bot] Nov 15, 2023
b7895b9
chore(release): 4.2.0
metcoder95 Nov 19, 2023
cc8fca5
chore(deps-dev): Bump @types/node from 20.9.0 to 20.9.2 (#448)
dependabot[bot] Nov 20, 2023
4260e7e
chore(deps-dev): Bump @typescript-eslint/parser from 6.10.0 to 6.11.0…
dependabot[bot] Nov 20, 2023
99ae84b
chore(deps-dev): Bump @typescript-eslint/eslint-plugin from 6.10.0 to…
dependabot[bot] Nov 21, 2023
27cac34
chore(deps-dev): Bump @typescript-eslint/eslint-plugin from 6.11.0 to…
dependabot[bot] Nov 27, 2023
b6e2559
chore(deps-dev): Bump @typescript-eslint/parser from 6.11.0 to 6.12.0…
dependabot[bot] Nov 27, 2023
b08a78e
chore(deps-dev): Bump @types/node from 20.9.2 to 20.10.0 (#455)
dependabot[bot] Nov 27, 2023
be15076
chore(deps-dev): Bump typescript from 5.1.6 to 5.3.2 (#454)
dependabot[bot] Nov 27, 2023
a150938
fix: default minThreads with odd CPU count (#457)
Chocobozzz Dec 3, 2023
6ea35db
chore(deps-dev): Bump @types/node from 20.10.0 to 20.10.4 (#466)
dependabot[bot] Dec 13, 2023
41408c1
chore(deps-dev): Bump ts-node from 9.1.1 to 10.9.2 (#463)
dependabot[bot] Dec 13, 2023
682a931
chore(release): 4.2.1
metcoder95 Dec 13, 2023
d30c6b3
chore(deps-dev): Bump @typescript-eslint/parser from 6.12.0 to 6.14.0…
dependabot[bot] Dec 13, 2023
89ee83a
chore(deps-dev): Bump typescript from 5.3.2 to 5.3.3 (#464)
dependabot[bot] Dec 13, 2023
8deb653
chore(deps-dev): Bump @typescript-eslint/eslint-plugin from 6.12.0 to…
dependabot[bot] Dec 13, 2023
88b40d8
chore(deps): Bump @babel/traverse from 7.22.8 to 7.23.2 (#469)
dependabot[bot] Dec 13, 2023
8dadd43
chore(deps-dev): Bump @types/node from 20.10.4 to 20.10.5 (#470)
dependabot[bot] Dec 19, 2023
d84adb3
chore(deps-dev): Bump @typescript-eslint/eslint-plugin from 6.14.0 to…
dependabot[bot] Dec 28, 2023
c1996d4
chore(deps-dev): Bump @typescript-eslint/parser from 6.14.0 to 6.16.0…
dependabot[bot] Dec 28, 2023
cff5414
chore(deps-dev): Bump @types/node from 20.10.5 to 20.10.6 (#476)
dependabot[bot] Jan 3, 2024
ef07d02
chore(deps-dev): Bump @typescript-eslint/eslint-plugin from 6.15.0 to…
dependabot[bot] Jan 3, 2024
40b5e6b
chore(deps-dev): Bump @types/node from 20.10.6 to 20.10.7 (#479)
dependabot[bot] Jan 12, 2024
a3571fd
feat: use native Node.js histogram support (#482)
clydin Jan 12, 2024
e848054
chore(deps-dev): Bump @types/node from 20.11.0 to 20.11.1 (#485)
dependabot[bot] Jan 16, 2024
79d7401
chore(release): 4.3.0
metcoder95 Jan 16, 2024
aabaa5b
chore(deps-dev): Bump @typescript-eslint/parser from 6.18.1 to 6.19.0…
dependabot[bot] Jan 26, 2024
177a58d
chore: workflows: drop v16.x (#495)
RafaelGSS Jan 27, 2024
6f4ef50
fix(#491): out of bounds histogram value (#496)
metcoder95 Jan 28, 2024
c1f533d
chore(release): 4.3.1
metcoder95 Jan 30, 2024
335e406
chore(deps-dev): Bump @typescript-eslint/eslint-plugin from 6.18.1 to…
dependabot[bot] Jan 31, 2024
be9e83b
fix(#513): forward errors correctly to Piscina (#514)
metcoder95 Feb 16, 2024
86a5b21
chore(release): 4.3.2
metcoder95 Feb 16, 2024
e0686f6
chore(deps-dev): Bump @types/node from 20.11.1 to 20.11.19 (#516)
dependabot[bot] Feb 21, 2024
07b03eb
chore(deps): Bump actions/cache from 3 to 4 (#505)
dependabot[bot] Feb 21, 2024
01726c3
feat: add option to disable run/wait time recording (#518)
clydin Feb 21, 2024
ff8dee0
feat: allow named import usage (#517)
clydin Feb 21, 2024
683cda4
chore(release): 4.4.0
metcoder95 Feb 28, 2024
b2d7e62
chore(deps-dev): Bump @types/node from 20.11.19 to 20.11.24 (#524)
dependabot[bot] Mar 6, 2024
eb3008c
feat!: narrow TS types for histograms
metcoder95 Mar 10, 2024
2877294
chore: add v21.x to CI
metcoder95 Mar 10, 2024
b60d8ba
chore: adjust cache
metcoder95 Mar 10, 2024
4b391ac
chore: update package-lock.json
metcoder95 Mar 10, 2024
4bb872f
Merge remote-tracking branch 'origin/current' into next
metcoder95 May 29, 2024
57570c6
refactor: cleanups
metcoder95 May 29, 2024
6cbc507
fix: lint
metcoder95 May 29, 2024
7e40e2d
Merge remote-tracking branch 'origin/current' into next
metcoder95 May 31, 2024
d701046
fix: merge with current
metcoder95 May 31, 2024
144bd1c
feat!: set FixedQueue as default task queue (#578)
metcoder95 Jun 4, 2024
363b5af
docs: remove runtask references (#581)
metcoder95 Jun 12, 2024
3729907
feat!: drop v16 (#582)
metcoder95 Jun 12, 2024
2f58db9
feat: initial shape
metcoder95 Jun 12, 2024
e72310a
Merge branch 'next' into feat/custom_balancer
metcoder95 Jun 12, 2024
f9af8a4
feat: implement balancer
metcoder95 Jun 19, 2024
f2ad1a3
fix: onclose
metcoder95 Jun 19, 2024
22e9642
fix: smaller tweaks
metcoder95 Jun 23, 2024
c5946d1
Merge remote-tracking branch 'origin/current' into next
metcoder95 Jul 21, 2024
4417fc1
Merge remote-tracking branch 'origin/next' into feat/custom_balancer
metcoder95 Jul 24, 2024
93d9c52
test: fix
metcoder95 Jul 24, 2024
3634822
feat: add histogram to workers (#619)
metcoder95 Jul 25, 2024
176c399
feat: add state properties to worker (#620)
metcoder95 Aug 2, 2024
267bde4
Merge remote-tracking branch 'origin/current' into next
metcoder95 Sep 15, 2024
53e41f4
chore: reconcile lockfile
metcoder95 Sep 15, 2024
bbb684d
docs: update README
metcoder95 Sep 15, 2024
a828c8d
Merge remote-tracking branch 'origin/current' into next
metcoder95 Sep 18, 2024
a8447c0
fix: bad merge
metcoder95 Sep 18, 2024
b2463b7
Merge branch 'next' into feat/custom_balancer
metcoder95 Sep 18, 2024
f329b81
feat: pool events for workers (#624)
metcoder95 Sep 20, 2024
0dfc689
feat: add worker events to pool (#625)
metcoder95 Oct 2, 2024
65c81f8
Merge remote-tracking branch 'origin' into feat/custom_balancer
metcoder95 Oct 2, 2024
fe6968d
refactor: drop commands from balancer
metcoder95 Oct 2, 2024
93a1504
refactor: cleanup
metcoder95 Oct 2, 2024
bb0aceb
chore: fix lockfile
metcoder95 Oct 2, 2024
34e3ef2
refactor: Update src/index.ts
metcoder95 Oct 3, 2024
d82b8e2
fix: linting
metcoder95 Oct 11, 2024
6484ba8
Merge branch 'current' into feat/custom_balancer
metcoder95 Oct 15, 2024
d9a7b9f
refactor: use performance.now instead of process.hrtime
metcoder95 Oct 16, 2024
8714a6f
refactor: decouple shcheduling from distribution
metcoder95 Oct 16, 2024
27b1f84
refactor: small tweaks
metcoder95 Oct 16, 2024
750646c
types: adjust types and documentation
metcoder95 Oct 25, 2024
86f265e
docs: add documentation
metcoder95 Oct 27, 2024
4093dfa
Merge remote-tracking branch 'origin/current' into feat/custom_balancer
metcoder95 Oct 30, 2024
c3b6fad
Merge remote-tracking branch 'origin/current' into feat/custom_balancer
metcoder95 Nov 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docs/docs/api-reference/event.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,16 @@ by number of tasks enqueued that are pending of execution.

## Event: `'message'`

A `'message'` event is emitted whenever a message is received from a worker thread.
A `'message'` event is emitted whenever a message is received from a worker thread.

## Event: `'workerCreate'`

Event that is triggered when a new worker is created.

As argument, it receives the worker instance.

## Event: `'workerDestroy'`

Event that is triggered when a worker is destroyed.

As argument, it receives the worker instance that has been destroyed.
2,352 changes: 1,491 additions & 861 deletions package-lock.json

Large diffs are not rendered by default.

189 changes: 121 additions & 68 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import type {
import {
kQueueOptions,
kTransferable,
kValue
kValue,
kWorkerData
} from './symbols';
import {
TaskQueue,
Expand All @@ -32,7 +33,10 @@ import {
} from './task_queue';
import {
WorkerInfo,
AsynchronouslyCreatedResourcePool
AsynchronouslyCreatedResourcePool,
PiscinaTaskBalancer,
PiscinaWorker,
ResourceBasedBalancer
} from './worker_pool';
import {
AbortSignalAny,
Expand Down Expand Up @@ -71,7 +75,9 @@ interface Options {
niceIncrement? : number,
trackUnmanagedFds? : boolean,
closeTimeout?: number,
recordTiming?: boolean
recordTiming?: boolean,
loadBalancer?: PiscinaTaskBalancer,
workerHistogram?: boolean,
}

interface FilledOptions extends Options {
Expand All @@ -86,7 +92,8 @@ interface FilledOptions extends Options {
taskQueue : TaskQueue,
niceIncrement : number,
closeTimeout : number,
recordTiming : boolean
recordTiming : boolean,
workerHistogram: boolean,
}

interface RunOptions {
Expand Down Expand Up @@ -120,7 +127,8 @@ const kDefaultOptions : FilledOptions = {
niceIncrement: 0,
trackUnmanagedFds: true,
closeTimeout: 30000,
recordTiming: true
recordTiming: true,
workerHistogram: false
};

const kDefaultRunOptions : FilledRunOptions = {
Expand Down Expand Up @@ -173,6 +181,7 @@ class ThreadPool {
workerFailsDuringBootstrap : boolean = false;
destroying : boolean = false;
maxCapacity: number;
balancer: PiscinaTaskBalancer;

constructor (publicInterface : Piscina, options : Options) {
this.publicInterface = publicInterface;
Expand Down Expand Up @@ -202,9 +211,10 @@ class ThreadPool {
this.options.maxQueue = options.maxQueue ?? kDefaultOptions.maxQueue;
}

this.balancer = this.options.loadBalancer ?? ResourceBasedBalancer({ maximumUsage: this.options.concurrentTasksPerWorker });
this.workers = new AsynchronouslyCreatedResourcePool<WorkerInfo>(
this.options.concurrentTasksPerWorker);
this.workers.onAvailable((w : WorkerInfo) => this._onWorkerAvailable(w));
this.workers.onTaskDone((w : WorkerInfo) => this._onWorkerTaskDone(w));
this.maxCapacity = this.options.maxThreads * this.options.concurrentTasksPerWorker;

this.startingUp = true;
Expand All @@ -223,6 +233,8 @@ class ThreadPool {
}

_addNewWorker () : void {
if (this.closingUp) return;

const pool = this;
const worker = new Worker(resolve(__dirname, 'worker.js'), {
env: this.options.env,
Expand All @@ -234,11 +246,27 @@ class ThreadPool {
});

const { port1, port2 } = new MessageChannel();
const workerInfo = new WorkerInfo(worker, port1, onMessage);
const workerInfo = new WorkerInfo(worker, port1, onMessage, this.options.workerHistogram);

workerInfo.onDestroy(() => {
this.publicInterface.emit('workerDestroy', workerInfo.interface);
});

if (this.startingUp) {
// There is no point in waiting for the initial set of Workers to indicate
// that they are ready, we just mark them as such from the start.
workerInfo.markAsReady();
// We need to emit the event in the next microtask, so that the user can
// attach event listeners before the event is emitted.
queueMicrotask(() => {
this.publicInterface.emit('workerCreate', workerInfo.interface);
this._onWorkerReady(workerInfo);
});
} else {
workerInfo.onReady(() => {
this.publicInterface.emit('workerCreate', workerInfo.interface);
this._onWorkerReady(workerInfo);
});
}

const message : StartupMessage = {
Expand All @@ -259,7 +287,9 @@ class ThreadPool {
const taskInfo = workerInfo.taskInfos.get(taskId);
workerInfo.taskInfos.delete(taskId);

pool.workers.maybeAvailable(workerInfo);
// TODO: we can abstract the task info handling
// right into the pool.workers.taskDone method
pool.workers.taskDone(workerInfo);

/* istanbul ignore if */
if (taskInfo === undefined) {
Expand Down Expand Up @@ -369,39 +399,81 @@ class ThreadPool {
this.workers.delete(workerInfo);
}

_onWorkerReady (workerInfo : WorkerInfo) : void {
this._onWorkerAvailable(workerInfo);
}

_onWorkerTaskDone (workerInfo: WorkerInfo) : void {
this._onWorkerAvailable(workerInfo);
}

_onWorkerAvailable (workerInfo : WorkerInfo) : void {
while ((this.taskQueue.size > 0 || this.skipQueue.length > 0) &&
workerInfo.currentUsage() < this.options.concurrentTasksPerWorker) {
let workers: PiscinaWorker[] | null = null;
while ((this.taskQueue.size > 0 || this.skipQueue.length > 0)) {
// The skipQueue will have tasks that we previously shifted off
// the task queue but had to skip over... we have to make sure
// we drain that before we drain the taskQueue.
const taskInfo = this.skipQueue.shift() ||
this.taskQueue.shift() as TaskInfo;
// If the task has an abortSignal and the worker has any other
// tasks, we cannot distribute the task to it. Skip for now.
if (taskInfo.abortSignal && workerInfo.taskInfos.size > 0) {
this.skipQueue.push(taskInfo);
break;

if (workers == null) {
workers = [...this.workers].map(workerInfo => workerInfo.interface);
}
const now = performance.now();
this.waitTime?.record(toHistogramIntegerNano(now - taskInfo.created));
taskInfo.started = now;
workerInfo.postTask(taskInfo);
this._maybeDrain();
return;

const distributed = this._distributeTask(taskInfo, workers);

// If task was distributed, we should continue to distribute more tasks
if (distributed) {
continue;
// If balancer states that pool is busy, we should stop trying to distribute tasks
} else { break; }
metcoder95 marked this conversation as resolved.
Show resolved Hide resolved
}

if (workerInfo.taskInfos.size === 0 &&
// If more workers than minThreads, we can remove idle workers
if (workerInfo.currentUsage() === 0 &&
this.workers.size > this.options.minThreads) {
workerInfo.idleTimeout = setTimeout(() => {
assert.strictEqual(workerInfo.taskInfos.size, 0);
assert.strictEqual(workerInfo.currentUsage(), 0);
if (this.workers.size > this.options.minThreads) {
this._removeWorker(workerInfo);
}
}, this.options.idleTimeout).unref();
}
}

_distributeTask (task: TaskInfo, workers: PiscinaWorker[]): boolean {
// We need to verify if the task is aborted already or not
// otherwise we might be distributing aborted tasks to workers
if (task.aborted) return false;

const candidate = this.balancer(task.interface, workers);

// Seeking for a real worker instead of customized one
if (candidate != null && candidate[kWorkerData] != null) {
const now = performance.now();
this.waitTime?.record(toHistogramIntegerNano(now - task.created));
task.started = now;
candidate[kWorkerData].postTask(task);
this._maybeDrain();
// If candidate, let's try to distribute more tasks
return true;
}

// We spawn if possible
// TODO: scheduler will intercept this.
metcoder95 marked this conversation as resolved.
Show resolved Hide resolved
if (this.workers.size < this.options.maxThreads) {
metcoder95 marked this conversation as resolved.
Show resolved Hide resolved
this._addNewWorker();
Copy link

@jerome-benoit jerome-benoit Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the worker histogram initialized in that case?
If a balancer query the runtime and that is initialized to zero, the balancer will wrongly pickup only new workers.
The same goes for worker recreation in the error case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each new worker, the histogram is instantiated to zero, but the whole WorkerPool has a global histogram that can be queried.

Nevertheless, to query it, you need to have a reference to the Worker Pool (Piscina instance). Maybe worth it to pass a reference to the global Piscina instance so it can query this data

Copy link

@jerome-benoit jerome-benoit Oct 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. Or you can initialize to the pool one if it's not a cumulative histogram.
On poolifier, each entries init to the min of worker matching entries.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be useful, I'll follow that path; thanks!

}

if (task.abortSignal) {
metcoder95 marked this conversation as resolved.
Show resolved Hide resolved
this.skipQueue.push(task);
} else {
this.taskQueue.push(task);
}

return false;
}

runTask (
task : any,
options : RunOptions) : Promise<any> {
Expand All @@ -424,9 +496,9 @@ class ThreadPool {
filename = maybeFileURLToPath(filename);

let signal: AbortSignalAny | null;
if (this.closingUp) {
if (this.closingUp || this.destroying) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between closingUp and destroying flag? I think the two can be merged.

const closingUpAbortController = new AbortController();
closingUpAbortController.abort('queue is closing up');
closingUpAbortController.abort('queue is being terminated');

signal = closingUpAbortController.signal;
} else {
Expand Down Expand Up @@ -462,8 +534,10 @@ class ThreadPool {
// If the AbortSignal has an aborted property and it's truthy,
// reject immediately.
if ((signal as AbortSignalEventTarget).aborted) {
return Promise.reject(new AbortError((signal as AbortSignalEventTarget).reason));
reject!(new AbortError((signal as AbortSignalEventTarget).reason));
return ret;
}

taskInfo.abortListener = () => {
// Call reject() first to make sure we always reject with the AbortError
// if the task is aborted, not with an Error from the possible
Expand All @@ -476,68 +550,40 @@ class ThreadPool {
this._ensureMinimumWorkers();
} else {
// Not yet running: Remove it from the queue.
// Call should be idempotent
this.taskQueue.remove(taskInfo);
}
};

onabort(signal, taskInfo.abortListener);
}

// If there is a task queue, there's no point in looking for an available
// Worker thread. Add this task to the queue, if possible.
if (this.taskQueue.size > 0) {
const totalCapacity = this.options.maxQueue + this.pendingCapacity();
if (this.taskQueue.size >= totalCapacity) {
if (this.options.maxQueue === 0) {
return Promise.reject(Errors.NoTaskQueueAvailable());
reject!(Errors.NoTaskQueueAvailable());
} else {
return Promise.reject(Errors.TaskQueueAtLimit());
reject!(Errors.TaskQueueAtLimit());
}
} else {
if (this.workers.size < this.options.maxThreads) {
this._addNewWorker();
}
this.taskQueue.push(taskInfo);
}

this._maybeDrain();
return ret;
}

// Look for a Worker with a minimum number of tasks it is currently running.
let workerInfo : WorkerInfo | null = this.workers.findAvailable();

// If we want the ability to abort this task, use only workers that have
// no running tasks.
if (workerInfo !== null && workerInfo.currentUsage() > 0 && signal) {
workerInfo = null;
}

// If no Worker was found, or that Worker was handling another task in some
// way, and we still have the ability to spawn new threads, do so.
let waitingForNewWorker = false;
if ((workerInfo === null || workerInfo.currentUsage() > 0) &&
this.workers.size < this.options.maxThreads) {
this._addNewWorker();
waitingForNewWorker = true;
}
const workers = [...this.workers.readyItems].map(workerInfo => workerInfo.interface);
const distributed = this._distributeTask(taskInfo, workers);

// If no Worker is found, try to put the task into the queue.
if (workerInfo === null) {
if (this.options.maxQueue <= 0 && !waitingForNewWorker) {
return Promise.reject(Errors.NoTaskQueueAvailable());
} else {
this.taskQueue.push(taskInfo);
if (!distributed) {
// We reject if no task queue set and no more pending capacity.
if (this.options.maxQueue <= 0 && this.pendingCapacity() === 0) {
reject!(Errors.NoTaskQueueAvailable());
}
};

this._maybeDrain();
return ret;
}

// TODO(addaleax): Clean up the waitTime/runTime recording.
const now = performance.now();
this.waitTime?.record(toHistogramIntegerNano(now - taskInfo.created));
taskInfo.started = now;
workerInfo.postTask(taskInfo);
this._maybeDrain();
return ret;
}
Expand Down Expand Up @@ -639,12 +685,12 @@ class ThreadPool {
for (const workerInfo of this.workers) {
checkIfWorkerIsDone(workerInfo);

workerInfo.port.on('message', () => checkIfWorkerIsDone(workerInfo));
this.workers.onTaskDone(checkIfWorkerIsDone);
}
});

const throwOnTimeOut = async (timeout: number) => {
await sleep(timeout);
await sleep(timeout, null, { ref: false });
throw Errors.CloseTimeout();
};

Expand Down Expand Up @@ -725,6 +771,12 @@ export default class Piscina<T = any, R = any> extends EventEmitterAsyncResource
if (options.closeTimeout !== undefined && (typeof options.closeTimeout !== 'number' || options.closeTimeout < 0)) {
throw new TypeError('options.closeTimeout must be a non-negative integer');
}
if (options.loadBalancer !== undefined && (typeof options.loadBalancer !== 'function' || options.loadBalancer.length < 1)) {
throw new TypeError('options.loadBalancer must be a function with at least two args');
}
if (options.workerHistogram !== undefined && (typeof options.workerHistogram !== 'boolean')) {
throw new TypeError('options.workerHistogram must be a boolean');
}

this.#pool = new ThreadPool(this, options);
}
Expand Down Expand Up @@ -755,6 +807,7 @@ export default class Piscina<T = any, R = any> extends EventEmitterAsyncResource
return Promise.reject(
new TypeError('signal argument must be an object'));
}

return this.#pool.runTask(task, { transferList, filename, name, signal });
}

Expand Down Expand Up @@ -829,7 +882,7 @@ export default class Piscina<T = any, R = any> extends EventEmitterAsyncResource
}

// count is available as of Node.js v16.14.0 but not present in the types
const count = (this.#pool.runTime as RecordableHistogram & { count: number}).count;
const count = (this.#pool.runTime as RecordableHistogram & { count: number }).count;
if (count === 0) {
return 0;
}
Expand Down
1 change: 1 addition & 0 deletions src/symbols.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Internal symbol used to mark Transferable objects returned
// by the Piscina.move() function
export const kMovable = Symbol('Piscina.kMovable');
export const kWorkerData = Symbol('Piscina.kWorkerData');
export const kTransferable = Symbol.for('Piscina.transferable');
export const kValue = Symbol.for('Piscina.valueOf');
export const kQueueOptions = Symbol.for('Piscina.queueOptions');
Expand Down
Loading