Skip to content

Commit

Permalink
feat: add Serialize type
Browse files Browse the repository at this point in the history
  • Loading branch information
nullndr authored and roggervalf committed Sep 13, 2024
1 parent f6546fb commit fc00af2
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 27 deletions.
54 changes: 27 additions & 27 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
RedisClient,
WorkerOptions,
} from '../interfaces';
import { MinimalQueue } from '../types';
import { MinimalQueue, Serialize } from '../types';
import {
delay,
DELAY_TIME_1,
Expand Down Expand Up @@ -55,7 +55,7 @@ export interface WorkerListener<
*
* This event is triggered when a job enters the 'active' state.
*/
active: (job: Job<DataType, ResultType, NameType>, prev: string) => void;
active: (job: Job<Serialize<DataType>, ResultType, NameType>, prev: string) => void;

/**
* Listen to 'closing' event.
Expand All @@ -77,7 +77,7 @@ export interface WorkerListener<
* This event is triggered when a job has successfully completed.
*/
completed: (
job: Job<DataType, ResultType, NameType>,
job: Job<Serialize<DataType>, ResultType, NameType>,
result: ResultType,
prev: string,
) => void;
Expand Down Expand Up @@ -106,7 +106,7 @@ export interface WorkerListener<
* reaches the stalled limit and it is deleted by the removeOnFail option.
*/
failed: (
job: Job<DataType, ResultType, NameType> | undefined,
job: Job<Serialize<DataType>, ResultType, NameType> | undefined,
error: Error,
prev: string,
) => void;
Expand All @@ -127,7 +127,7 @@ export interface WorkerListener<
* world.
*/
progress: (
job: Job<DataType, ResultType, NameType>,
job: Job<Serialize<DataType>, ResultType, NameType>,
progress: number | object,
) => void;

Expand Down Expand Up @@ -171,7 +171,7 @@ export class Worker<

private abortDelayController: AbortController | null = null;
private asyncFifoQueue: AsyncFifoQueue<void | Job<
DataType,
Serialize<DataType>,
ResultType,
NameType
>>;
Expand All @@ -187,7 +187,7 @@ export class Worker<
private _repeat: Repeat;

protected paused: Promise<void>;
protected processFn: Processor<DataType, ResultType, NameType>;
protected processFn: Processor<Serialize<DataType>, ResultType, NameType>;
protected running = false;

static RateLimitError(): Error {
Expand All @@ -196,7 +196,7 @@ export class Worker<

constructor(
name: string,
processor?: string | URL | null | Processor<DataType, ResultType, NameType>,
processor?: string | URL | null | Processor<Serialize<DataType>, ResultType, NameType>,
opts?: WorkerOptions,
Connection?: typeof RedisConnection,
) {
Expand Down Expand Up @@ -289,7 +289,7 @@ export class Worker<
useWorkerThreads: this.opts.useWorkerThreads,
});

this.processFn = sandbox<DataType, ResultType, NameType>(
this.processFn = sandbox<Serialize<DataType>, ResultType, NameType>(
processor,
this.childPool,
).bind(this);
Expand Down Expand Up @@ -348,7 +348,7 @@ export class Worker<
}

protected callProcessJob(
job: Job<DataType, ResultType, NameType>,
job: Job<Serialize<DataType>, ResultType, NameType>,
token: string,
): Promise<ResultType> {
return this.processFn(job, token);
Expand All @@ -357,9 +357,9 @@ export class Worker<
protected createJob(
data: JobJsonRaw,
jobId: string,
): Job<DataType, ResultType, NameType> {
): Job<Serialize<DataType>, ResultType, NameType> {
return this.Job.fromJSON(this as MinimalQueue, data, jobId) as Job<
DataType,
Serialize<DataType>,
ResultType,
NameType
>;
Expand Down Expand Up @@ -423,7 +423,7 @@ export class Worker<
this.startLockExtenderTimer(jobsInProgress);

const asyncFifoQueue = (this.asyncFifoQueue =
new AsyncFifoQueue<void | Job<DataType, ResultType, NameType>>());
new AsyncFifoQueue<void | Job<Serialize<DataType>, ResultType, NameType>>());

let tokenPostfix = 0;

Expand All @@ -450,7 +450,7 @@ export class Worker<
const token = `${this.id}:${tokenPostfix++}`;

const fetchedJob = this.retryIfFailed<void | Job<
DataType,
Serialize<DataType>,
ResultType,
NameType
>>(
Expand Down Expand Up @@ -484,18 +484,18 @@ export class Worker<

// Since there can be undefined jobs in the queue (when a job fails or queue is empty)
// we iterate until we find a job.
let job: Job<DataType, ResultType, NameType> | void;
let job: Job<Serialize<DataType>, ResultType, NameType> | void;
do {
job = await asyncFifoQueue.fetch();
} while (!job && asyncFifoQueue.numQueued() > 0);

if (job) {
const token = job.token;
asyncFifoQueue.add(
this.retryIfFailed<void | Job<DataType, ResultType, NameType>>(
this.retryIfFailed<void | Job<Serialize<DataType>, ResultType, NameType>>(
() =>
this.processJob(
<Job<DataType, ResultType, NameType>>job,
<Job<Serialize<DataType>, ResultType, NameType>>job,
token,
() => asyncFifoQueue.numTotal() <= this.opts.concurrency,
jobsInProgress,
Expand Down Expand Up @@ -533,7 +533,7 @@ export class Worker<
bclient: RedisClient,
token: string,
{ block = true }: GetNextJobOptions = {},
): Promise<Job<DataType, ResultType, NameType> | undefined> {
): Promise<Job<Serialize<DataType>, ResultType, NameType> | undefined> {
if (this.paused) {
if (block) {
await this.paused;
Expand Down Expand Up @@ -608,7 +608,7 @@ will never work with more accuracy than 1ms. */
client: RedisClient,
token: string,
name?: string,
): Promise<Job<DataType, ResultType, NameType>> {
): Promise<Job<Serialize<DataType>, ResultType, NameType>> {
const [jobData, id, limitUntil, delayUntil] =
await this.scripts.moveToActive(client, token, name);
this.updateDelays(limitUntil, delayUntil);
Expand Down Expand Up @@ -719,7 +719,7 @@ will never work with more accuracy than 1ms. */
jobData?: JobJsonRaw,
jobId?: string,
token?: string,
): Promise<Job<DataType, ResultType, NameType>> {
): Promise<Job<Serialize<DataType>, ResultType, NameType>> {
if (!jobData) {
if (!this.drained) {
this.emit('drained');
Expand All @@ -740,11 +740,11 @@ will never work with more accuracy than 1ms. */
}

async processJob(
job: Job<DataType, ResultType, NameType>,
job: Job<Serialize<DataType>, ResultType, NameType>,
token: string,
fetchNextCallback = () => true,
jobsInProgress: Set<{ job: Job; ts: number }>,
): Promise<void | Job<DataType, ResultType, NameType>> {
): Promise<void | Job<Serialize<DataType>, ResultType, NameType>> {
if (!job || this.closing || this.paused) {
return;
}
Expand Down Expand Up @@ -1051,10 +1051,10 @@ will never work with more accuracy than 1ms. */

stalled.forEach((jobId: string) => this.emit('stalled', jobId, 'active'));

const jobPromises: Promise<Job<DataType, ResultType, NameType>>[] = [];
const jobPromises: Promise<Job<Serialize<DataType>, ResultType, NameType>>[] = [];
for (let i = 0; i < failed.length; i++) {
jobPromises.push(
Job.fromId<DataType, ResultType, NameType>(
Job.fromId<Serialize<DataType>, ResultType, NameType>(
this as MinimalQueue,
failed[i],
),
Expand All @@ -1069,8 +1069,8 @@ will never work with more accuracy than 1ms. */
this.notifyFailedJobs(await Promise.all(jobPromises));
}

private notifyFailedJobs(failedJobs: Job<DataType, ResultType, NameType>[]) {
failedJobs.forEach((job: Job<DataType, ResultType, NameType>) =>
private notifyFailedJobs(failedJobs: Job<Serialize<DataType>, ResultType, NameType>[]) {
failedJobs.forEach((job: Job<Serialize<DataType>, ResultType, NameType>) =>
this.emit(
'failed',
job,
Expand All @@ -1081,7 +1081,7 @@ will never work with more accuracy than 1ms. */
}

private moveLimitedBackToWait(
job: Job<DataType, ResultType, NameType>,
job: Job<Serialize<DataType>, ResultType, NameType>,
token: string,
) {
return this.scripts.moveJobFromActiveToWait(job.id, token);
Expand Down
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from './job-json-sandbox';
export * from './job-options';
export * from './job-type';
export * from './repeat-strategy';
export * from './serialize';
146 changes: 146 additions & 0 deletions src/types/serialize.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/**
* The `Serialize` type emulates the operation `JSON.parse(JSON.stringify(x))` done
* when passing the `data` to a `Job` in a `Worker`.
*/
export type Serialize<T> =
/**
* If the input is any then the output must as well be any
*/
IsAny<T> extends true
? any
: /**
* Under the hood, JSON.stringify calls the `toJSON()` method on his parameter.
*/
T extends { toJSON(): infer U }
? U extends JsonValue
? U
: unknown
: /**
* Primitives
*/
T extends JsonPrimitive
? T
: /**
* Primitive wrappers
*/
T extends String
? string
: T extends Number
? number
: T extends Boolean
? boolean
: /**
* JSON.stringify returns always `{}` for a `Promise`
*/
T extends Promise<unknown>
? '{}'
: /**
* Map
*/
T extends Map<unknown, unknown>
? EmptyObject
: /**
* Set
*/
T extends Set<unknown>
? EmptyObject
: /**
* Array views
*/
T extends TypedArray
? Record<string, number>
: /**
* Some object can't be serialized, so we remove them.
*/
T extends NotJson
? never
: /**
* Arrays
*/
T extends []
? []
: T extends readonly [infer F, ...infer R]
? [NeverToNull<Serialize<F>>, ...Serialize<R>]
: T extends readonly unknown[]
? Array<NeverToNull<Serialize<T[number]>>>
: /**
* Objects
*/
T extends Record<keyof unknown, unknown>
? Prettify<SerializeObject<T>>
: /**
* Unknown
*/
unknown extends T
? unknown
: never;

/**
* Some utils.
*/

type NotJson = undefined | symbol | ((...args: any[]) => unknown);

// value is always not JSON => true
// value is always JSON => false
// value is somtimes JSON, sometimes not JSON => boolean
// note: cannot be inlined as logic requires union distribution
type ValueIsNotJson<T> = T extends NotJson ? true : false;

// note: remove optionality so that produced values are never `undefined`,
// only `true`, `false`, or `boolean`
type IsNotJson<T> = { [K in keyof T]-?: ValueIsNotJson<T[K]> };

type SerializeValues<T> = { [K in keyof T]: Serialize<T[K]> };

type SerializeObject<T extends Record<keyof unknown, unknown>> =
// required
{
[K in keyof T as unknown extends T[K]
? never
: IsNotJson<T>[K] extends false
? K
: never]: SerializeValues<T>[K];
} & {
// optional
[K in keyof T as unknown extends T[K]
? K
: // if the value is always JSON, then it's not optional
IsNotJson<T>[K] extends false
? never
: // if the value is always not JSON, omit it entirely
IsNotJson<T>[K] extends true
? never
: // if the value is mixed, then it's optional
K]?: SerializeValues<T>[K];
};

type JsonPrimitive = string | number | boolean | null;

type JsonArray = JsonValue[] | readonly JsonValue[];

type JsonObject = { [K in string]: JsonValue } & { [K in string]?: JsonValue };

type JsonValue = JsonPrimitive | JsonObject | JsonArray;

type TypedArray =
| Int8Array
| Uint8Array
| Uint8ClampedArray
| Int16Array
| Uint16Array
| Int32Array
| Uint32Array
| Float32Array
| Float64Array
| BigInt64Array
| BigUint64Array;

type Prettify<T> = { [K in keyof T]: T[K] } & {};

type NeverToNull<T> = [T] extends [never] ? null : T;

declare const emptyObjectSymbol: unique symbol;
type EmptyObject = { [emptyObjectSymbol]?: never };

type IsAny<T> = 0 extends 1 & T ? true : false;

0 comments on commit fc00af2

Please sign in to comment.