From 86d736cf4c239d20e1a403d11a82b7ead0611aa8 Mon Sep 17 00:00:00 2001 From: Carlos Fuentes Date: Fri, 10 Jan 2025 10:13:32 +0100 Subject: [PATCH] feat(#305)!: Expose new `PiscinaHistogram` abstraction (#723) --- docs/docs/api-reference/class.md | 84 ++++++++++++++++++++++++ src/common.ts | 33 ---------- src/histogram.ts | 108 +++++++++++++++++++++++++++++++ src/index.ts | 71 +++++++++++--------- src/types.ts | 23 ------- src/worker_pool/index.ts | 10 +-- test/histogram.ts | 46 ++++--------- 7 files changed, 249 insertions(+), 126 deletions(-) create mode 100644 src/histogram.ts diff --git a/docs/docs/api-reference/class.md b/docs/docs/api-reference/class.md index e53318ce..2e75a003 100644 --- a/docs/docs/api-reference/class.md +++ b/docs/docs/api-reference/class.md @@ -110,6 +110,90 @@ Use caution when setting resource limits. Setting limits that are too low may result in the `Piscina` worker threads being unusable. ::: +## `PiscinaHistogram` + +The `PiscinaHistogram` allows you to access the histogram data for the pool of worker threads. +It can be reset upon request in case of a need to clear the data. + +**Example**: + +```js +import { Piscina } from 'piscina'; + +const pool = new Piscina({ + filename: resolve(__dirname, 'path/to/worker.js'), +}); + +const firstBatch = []; + +for (let n = 0; n < 10; n++) { + firstBatch.push(pool.run('42')); +} + +await Promise.all(firstBatch); + +console.log(pool.histogram.runTime); // Print run time histogram summary +console.log(pool.histogram.waitTime); // Print wait time histogram summary + +// If in need to reset the histogram data for a new set of tasks +pool.histogram.resetRunTime(); +pool.histogram.resetWaitTime(); + +const secondBatch = []; + +for (let n = 0; n < 10; n++) { + secondBatch.push(pool.run('42')); +} + +await Promise.all(secondBatch); + +// The histogram data will only contain the data for the second batch of tasks +console.log(pool.histogram.runTime); +console.log(pool.histogram.waitTime); +``` + +### Interface: `PiscinaLoadBalancer` + +- `runTime`: (`PiscinaHistogramSummary`) Run Time Histogram Summary. Time taken to execute a task. +- `waitTime`: (`PiscinaHistogramSummary`) Wait Time Histogram Summary. Time between a task being submitted and the task starting to run. + +> **Note**: The histogram data is only available if `recordTiming` is set to `true`. + +```ts +type PiscinaHistogram = { + runTime: PiscinaHistogramSummary; + waitTime: PiscinaHistogramSummary; + resetRunTime(): void; // Reset Run Time Histogram + resetWaitTime(): void; // Reset Wait Time Histogram +``` + +### Interface: `PiscinaHistogramSummary` + +```ts +type PiscinaHistogramSummary = { + average: number; + mean: number; + stddev: number; + min: number; + max: number; + p0_001: number; + p0_01: number; + p0_1: number; + p1: number; + p2_5: number; + p10: number; + p25: number; + p50: number; + p75: number; + p90: number; + p97_5: number; + p99: number; + p99_9: number; + p99_99: number; + p99_999: number; +} +``` + ## `PiscinaLoadBalancer` The `PiscinaLoadBalancer` interface is used to implement custom load balancing algorithm that determines which worker thread should be assigned a task. diff --git a/src/common.ts b/src/common.ts index 4ce8f2aa..4c49cce1 100644 --- a/src/common.ts +++ b/src/common.ts @@ -1,8 +1,6 @@ -import type { Histogram } from 'node:perf_hooks'; import { fileURLToPath, URL } from 'node:url'; import { availableParallelism } from 'node:os'; -import type { HistogramSummary } from './types'; import { kMovable, kTransferable, kValue } from './symbols'; // States wether the worker is ready to receive tasks @@ -52,37 +50,6 @@ export const commonState = { workerData: undefined }; -export function createHistogramSummary (histogram: Histogram): HistogramSummary { - const { mean, stddev, min, max } = histogram; - - return { - average: mean / 1000, - mean: mean / 1000, - stddev, - min: min / 1000, - max: max / 1000, - p0_001: histogram.percentile(0.001) / 1000, - p0_01: histogram.percentile(0.01) / 1000, - p0_1: histogram.percentile(0.1) / 1000, - p1: histogram.percentile(1) / 1000, - p2_5: histogram.percentile(2.5) / 1000, - p10: histogram.percentile(10) / 1000, - p25: histogram.percentile(25) / 1000, - p50: histogram.percentile(50) / 1000, - p75: histogram.percentile(75) / 1000, - p90: histogram.percentile(90) / 1000, - p97_5: histogram.percentile(97.5) / 1000, - p99: histogram.percentile(99) / 1000, - p99_9: histogram.percentile(99.9) / 1000, - p99_99: histogram.percentile(99.99) / 1000, - p99_999: histogram.percentile(99.999) / 1000 - }; -} - -export function toHistogramIntegerNano (milliseconds: number): number { - return Math.max(1, Math.trunc(milliseconds * 1000)); -} - export function maybeFileURLToPath (filename : string) : string { return filename.startsWith('file:') ? fileURLToPath(new URL(filename)) diff --git a/src/histogram.ts b/src/histogram.ts new file mode 100644 index 00000000..1c3c7100 --- /dev/null +++ b/src/histogram.ts @@ -0,0 +1,108 @@ +import { RecordableHistogram, createHistogram } from 'node:perf_hooks'; + +export type PiscinaHistogramSummary = { + average: number; + mean: number; + stddev: number; + min: number; + max: number; + p0_001: number; + p0_01: number; + p0_1: number; + p1: number; + p2_5: number; + p10: number; + p25: number; + p50: number; + p75: number; + p90: number; + p97_5: number; + p99: number; + p99_9: number; + p99_99: number; + p99_999: number; +}; + +export type PiscinaHistogram = { + runTime: PiscinaHistogramSummary; + waitTime: PiscinaHistogramSummary; + resetRunTime(): void; + resetWaitTime(): void; +}; + +export class PiscinaHistogramHandler { + #runTime: RecordableHistogram; + #waitTime: RecordableHistogram; + + constructor() { + this.#runTime = createHistogram(); + this.#waitTime = createHistogram(); + } + + get runTimeSummary(): PiscinaHistogramSummary { + return PiscinaHistogramHandler.createHistogramSummary(this.#runTime); + } + + get waitTimeSummary(): PiscinaHistogramSummary { + return PiscinaHistogramHandler.createHistogramSummary(this.#waitTime); + } + + get runTimeCount(): number { + return this.#runTime.count; + } + + get waitTimeCount(): number { + return this.#waitTime.count; + } + + recordRunTime(value: number) { + this.#runTime.record(PiscinaHistogramHandler.toHistogramIntegerNano(value)); + } + + recordWaitTime(value: number) { + this.#waitTime.record( + PiscinaHistogramHandler.toHistogramIntegerNano(value) + ); + } + + resetWaitTime(): void { + this.#waitTime.reset(); + } + + resetRunTime(): void { + this.#runTime.reset(); + } + + static createHistogramSummary( + histogram: RecordableHistogram + ): PiscinaHistogramSummary { + const { mean, stddev, min, max } = histogram; + + return { + average: mean / 1000, + mean: mean / 1000, + stddev, + min: min / 1000, + max: max / 1000, + p0_001: histogram.percentile(0.001) / 1000, + p0_01: histogram.percentile(0.01) / 1000, + p0_1: histogram.percentile(0.1) / 1000, + p1: histogram.percentile(1) / 1000, + p2_5: histogram.percentile(2.5) / 1000, + p10: histogram.percentile(10) / 1000, + p25: histogram.percentile(25) / 1000, + p50: histogram.percentile(50) / 1000, + p75: histogram.percentile(75) / 1000, + p90: histogram.percentile(90) / 1000, + p97_5: histogram.percentile(97.5) / 1000, + p99: histogram.percentile(99) / 1000, + p99_9: histogram.percentile(99.9) / 1000, + p99_99: histogram.percentile(99.99) / 1000, + p99_999: histogram.percentile(99.999) / 1000, + }; + } + + static toHistogramIntegerNano(milliseconds: number): number { + return Math.max(1, Math.trunc(milliseconds * 1000)); + } +} diff --git a/src/index.ts b/src/index.ts index c43f6d2f..8bab56b2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,7 +2,7 @@ import { Worker, MessageChannel, MessagePort } from 'node:worker_threads'; import { once, EventEmitterAsyncResource } from 'node:events'; import { resolve } from 'node:path'; import { inspect, types } from 'node:util'; -import { RecordableHistogram, createHistogram, performance } from 'node:perf_hooks'; +import { performance } from 'node:perf_hooks'; import { setTimeout as sleep } from 'node:timers/promises'; import assert from 'node:assert'; @@ -13,7 +13,6 @@ import type { Transferable, ResourceLimits, EnvSpecifier, - HistogramSummary } from './types'; import { kQueueOptions, @@ -44,14 +43,16 @@ import { AbortError, onabort } from './abort'; +import { + PiscinaHistogram, + PiscinaHistogramHandler, +} from './histogram'; import { Errors } from './errors'; import { READY, commonState, isTransferable, markMovable, - createHistogramSummary, - toHistogramIntegerNano, getAvailableParallelism, maybeFileURLToPath } from './common'; @@ -171,8 +172,7 @@ class ThreadPool { taskQueue : TaskQueue; skipQueue : TaskInfo[] = []; completed : number = 0; - runTime? : RecordableHistogram; - waitTime? : RecordableHistogram; + histogram: PiscinaHistogramHandler | null = null; _needsDrain : boolean; start : number = performance.now(); inProcessPendingMessages : boolean = false; @@ -192,8 +192,7 @@ class ThreadPool { this.options = { ...kDefaultOptions, ...options, filename, maxQueue: 0 }; if (this.options.recordTiming) { - this.runTime = createHistogram(); - this.waitTime = createHistogram(); + this.histogram = new PiscinaHistogramHandler(); } // The >= and <= could be > and < but this way we get 100 % coverage 🙃 @@ -458,7 +457,7 @@ class ThreadPool { // Seeking for a real worker instead of customized one if (candidate != null && candidate[kWorkerData] != null) { const now = performance.now(); - this.waitTime?.record(toHistogramIntegerNano(now - task.created)); + this.histogram?.recordWaitTime(now - task.created) task.started = now; candidate[kWorkerData].postTask(task); this._maybeDrain(); @@ -518,7 +517,7 @@ class ThreadPool { (err : Error | null, result : any) => { this.completed++; if (taskInfo.started) { - this.runTime?.record(toHistogramIntegerNano(performance.now() - taskInfo.started)); + this.histogram?.recordRunTime(performance.now() - taskInfo.started); } if (err !== null) { reject(err); @@ -718,6 +717,7 @@ class ThreadPool { export default class Piscina extends EventEmitterAsyncResource { #pool : ThreadPool; + #histogram: PiscinaHistogram | null = null; constructor (options : Options = {}) { super({ ...options, name: 'Piscina' }); @@ -867,34 +867,45 @@ export default class Piscina extends EventEmitterAsyncResource return this.#pool.completed; } - get waitTime () : HistogramSummary | null { - if (!this.#pool.waitTime) { - return null; - } - - return createHistogramSummary(this.#pool.waitTime); - } - - get runTime () : any { - if (!this.#pool.runTime) { - return null; - } + get histogram () : PiscinaHistogram { + if (this.#histogram == null) { + const piscinahistogram = { + // @ts-expect-error + get runTime() { return this.histogram?.runTimeSummary! }, + // @ts-expect-error + get waitTime() { return this.histogram?.waitTimeSummary! }, + resetRunTime() { + // @ts-expect-error + this.histogram?.resetRunTime() + }, + resetWaitTime() { + // @ts-expect-error + this.histogram?.resetWaitTime() + }, + } + + Object.defineProperty(piscinahistogram, 'histogram', { + value: this.#pool.histogram, + writable: false, + enumerable: false, + configurable: false, + }) + + this.#histogram = piscinahistogram; + }; - return createHistogramSummary(this.#pool.runTime); + return this.#histogram; } get utilization () : number { - if (!this.#pool.runTime) { + if (this.#pool.histogram == null) { return 0; } // count is available as of Node.js v16.14.0 but not present in the types - const count = (this.#pool.runTime as RecordableHistogram & { count: number }).count; - if (count === 0) { - return 0; - } + const count = this.#pool.histogram.runTimeCount; - if (!this.#pool.runTime) { + if (count === 0) { return 0; } @@ -903,7 +914,7 @@ export default class Piscina extends EventEmitterAsyncResource // of time the pool has been running multiplied by the // maximum number of threads. const capacity = this.duration * this.#pool.options.maxThreads; - const totalMeanRuntime = (this.#pool.runTime.mean / 1000) * count; + const totalMeanRuntime = (this.#pool.histogram.runTimeSummary.mean / 1000) * count; // We calculate the appoximate pool utilization by multiplying // the mean run time of all tasks by the number of runtime diff --git a/src/types.ts b/src/types.ts index c6c4a997..9b55bdf5 100644 --- a/src/types.ts +++ b/src/types.ts @@ -40,29 +40,6 @@ export interface Transferable { readonly [kValue]: object; } -export interface HistogramSummary { - average: number; - mean: number; - stddev: number; - min: number; - max: number; - p0_001: number; - p0_01: number; - p0_1: number; - p1: number; - p2_5: number; - p10: number; - p25: number; - p50: number; - p75: number; - p90: number; - p97_5: number; - p99: number; - p99_9: number; - p99_99: number; - p99_999: number; -} - export type ResourceLimits = Worker extends { resourceLimits?: infer T; } diff --git a/src/worker_pool/index.ts b/src/worker_pool/index.ts index bf4d7642..199b1811 100644 --- a/src/worker_pool/index.ts +++ b/src/worker_pool/index.ts @@ -2,12 +2,12 @@ import { Worker, MessagePort, receiveMessageOnPort } from 'node:worker_threads'; import { createHistogram, RecordableHistogram } from 'node:perf_hooks'; import assert from 'node:assert'; -import { HistogramSummary, RequestMessage, ResponseMessage } from '../types'; +import { RequestMessage, ResponseMessage } from '../types'; import { Errors } from '../errors'; import { TaskInfo } from '../task_queue'; import { kFieldCount, kRequestCountField, kResponseCountField, kWorkerData } from '../symbols'; -import { createHistogramSummary, toHistogramIntegerNano } from '../common'; +import { PiscinaHistogramHandler, PiscinaHistogramSummary } from '../histogram'; import { AsynchronouslyCreatedResource, AsynchronouslyCreatedResourcePool } from './base'; export * from './balancer'; @@ -18,7 +18,7 @@ export type PiscinaWorker = { id: number; currentUsage: number; isRunningAbortableTask: boolean; - histogram: HistogramSummary | null; + histogram: PiscinaHistogramSummary | null; terminating: boolean; destroyed: boolean; [kWorkerData]: WorkerInfo; @@ -96,7 +96,7 @@ export class WorkerInfo extends AsynchronouslyCreatedResource { _handleResponse (message : ResponseMessage) : void { if (message.time != null) { - this.histogram?.record(toHistogramIntegerNano(message.time)); + this.histogram?.record(PiscinaHistogramHandler.toHistogramIntegerNano(message.time)); } this.onMessage(message); @@ -184,7 +184,7 @@ export class WorkerInfo extends AsynchronouslyCreatedResource { return worker.isRunningAbortableTask(); }, get histogram () { - return worker.histogram != null ? createHistogramSummary(worker.histogram) : null; + return worker.histogram != null ? PiscinaHistogramHandler.createHistogramSummary(worker.histogram) : null; }, get terminating () { return worker.terminating; diff --git a/test/histogram.ts b/test/histogram.ts index 5cc7c07a..82392f29 100644 --- a/test/histogram.ts +++ b/test/histogram.ts @@ -14,7 +14,8 @@ test('pool will maintain run and wait time histograms by default', async ({ equa } await Promise.all(tasks); - const waitTime = pool.waitTime as any; + const histogram = pool.histogram; + const waitTime = histogram.waitTime; ok(waitTime); equal(typeof waitTime.average, 'number'); equal(typeof waitTime.mean, 'number'); @@ -22,13 +23,15 @@ test('pool will maintain run and wait time histograms by default', async ({ equa equal(typeof waitTime.min, 'number'); equal(typeof waitTime.max, 'number'); - const runTime = pool.runTime as any; + const runTime = histogram.runTime; ok(runTime); equal(typeof runTime.average, 'number'); equal(typeof runTime.mean, 'number'); equal(typeof runTime.stddev, 'number'); equal(typeof runTime.min, 'number'); equal(typeof runTime.max, 'number'); + equal(typeof histogram.resetRunTime, 'function'); + equal(typeof histogram.resetWaitTime, 'function'); }); test('pool will maintain run and wait time histograms when recordTiming is true', async ({ ok }) => { @@ -43,14 +46,14 @@ test('pool will maintain run and wait time histograms when recordTiming is true' } await Promise.all(tasks); - const waitTime = pool.waitTime as any; + const waitTime = pool.histogram.waitTime; ok(waitTime); - const runTime = pool.runTime as any; + const runTime = pool.histogram.runTime; ok(runTime); }); -test('pool does not maintain run and wait time histograms when recordTiming is false', async ({ equal }) => { +test('pool does not maintain run and wait time histograms when recordTiming is false', async ({ notOk }) => { const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/eval.js'), recordTiming: false @@ -62,8 +65,8 @@ test('pool does not maintain run and wait time histograms when recordTiming is f } await Promise.all(tasks); - equal(pool.waitTime, null); - equal(pool.runTime, null); + notOk(pool.histogram.waitTime); + notOk(pool.histogram.runTime); }); test('workers has histogram', async t => { @@ -136,34 +139,7 @@ test('workers does not have histogram if disabled', async t => { await Promise.all(tasks); }); -// test('histogram of worker should be initialized with max concurrent task set as min', { only: true }, async t => { -// // After each task the balancer is called to distribute the next task -// // The first task is distributed, the second is enqueued, once the first is done, the second is distributed and normalizes -// let counter = 0; -// const pool = new Piscina({ -// filename: resolve(__dirname, 'fixtures/eval.js'), -// maxThreads: 2, -// concurrentTasksPerWorker: 1, -// workerHistogram: true, -// }); -// const tasks = []; - -// t.plan(10 * 2); -// pool.on('workerCreate', worker => { -// if (counter === 0) { -// t.equal(worker.histogram.min, 0); -// } else { -// t.equal(worker.histogram.min, 1); -// } -// }) - -// for (let n = 0; n < 10; n++) { -// tasks.push(pool.run('new Promise(resolve => setTimeout(resolve, 500))')); -// } -// await Promise.all(tasks); -// }); - -test('opts.workerHistogram should be a boolean value', async t => { +test('opts.workerHistogram should be a boolean value', t => { let index = 0; t.plan(1); t.throws(() => {