diff --git a/src/asynciterable/_sleep.ts b/src/asynciterable/_sleep.ts index 42e307c8..63e3baf8 100644 --- a/src/asynciterable/_sleep.ts +++ b/src/asynciterable/_sleep.ts @@ -1,7 +1,7 @@ import { AbortError } from '../aborterror.js'; -export function sleep(dueTime: number, signal?: AbortSignal) { - return new Promise((resolve, reject) => { +export function sleep(dueTime: number, signal?: AbortSignal, unref = false): Promise { + return new Promise((resolve, reject) => { if (signal && signal.aborted) { reject(new AbortError()); } @@ -18,6 +18,10 @@ export function sleep(dueTime: number, signal?: AbortSignal) { resolve(); }, dueTime); + if (unref && typeof id['unref'] === 'function') { + id['unref'](); + } + if (signal) { signal.addEventListener('abort', onAbort, { once: true }); } diff --git a/src/asynciterable/interval.ts b/src/asynciterable/interval.ts index 4938b62e..048fd461 100644 --- a/src/asynciterable/interval.ts +++ b/src/asynciterable/interval.ts @@ -4,17 +4,19 @@ import { throwIfAborted } from '../aborterror.js'; class IntervalAsyncIterable extends AsyncIterableX { private _dueTime: number; + private _unref: boolean; - constructor(dueTime: number) { + constructor(dueTime: number, unref: boolean) { super(); this._dueTime = dueTime; + this._unref = unref; } async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); let i = 0; while (1) { - await sleep(this._dueTime, signal); + await sleep(this._dueTime, signal, this._unref); yield i++; } } @@ -24,8 +26,9 @@ class IntervalAsyncIterable extends AsyncIterableX { * Produces a new item in an async-iterable at the given interval cycle time. * * @param {number} dueTime The due time in milliseconds to spawn a new item. + * @param {boolean} [unref=false] Whether to unref the interval timer. * @returns {AsyncIterableX} An async-iterable producing values at the specified interval. */ -export function interval(dueTime: number): AsyncIterableX { - return new IntervalAsyncIterable(dueTime); +export function interval(dueTime: number, unref = false): AsyncIterableX { + return new IntervalAsyncIterable(dueTime, unref); } diff --git a/src/asynciterable/operators/buffercountortime.ts b/src/asynciterable/operators/buffercountortime.ts index 083ddb28..cea17c1a 100644 --- a/src/asynciterable/operators/buffercountortime.ts +++ b/src/asynciterable/operators/buffercountortime.ts @@ -18,7 +18,7 @@ class BufferCountOrTime extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { const buffer: TSource[] = []; - const timer = interval(this.maxWaitTime).pipe(map(() => timerEvent)); + const timer = interval(this.maxWaitTime, true).pipe(map(() => timerEvent)); const source = concat(this.source, of(ended)); const merged = merge(source, timer); diff --git a/src/asynciterable/operators/timeout.ts b/src/asynciterable/operators/timeout.ts index 8a23493e..8467b7d9 100644 --- a/src/asynciterable/operators/timeout.ts +++ b/src/asynciterable/operators/timeout.ts @@ -60,7 +60,7 @@ export class TimeoutAsyncIterable extends AsyncIterableX { it.next().then((val) => { return { type: VALUE_TYPE, value: val }; }), - sleep(this._dueTime, signal).then(() => { + sleep(this._dueTime, signal, true).then(() => { return { type: ERROR_TYPE }; }), ]);