diff --git a/packages/sidetrack/.kanelrc.js b/packages/sidetrack/.kanelrc.js index 440754c9..2b5de4b4 100644 --- a/packages/sidetrack/.kanelrc.js +++ b/packages/sidetrack/.kanelrc.js @@ -1,23 +1,14 @@ -const path = require("path"); -const { resolveType } = require("kanel"); - +const { defaultGenerateIdentifierType } = require("kanel"); /** @type {import('kanel').Config} */ module.exports = { connection: { connectionString: process.env.DATABASE_URL }, generateIdentifierType: (c, d, config) => { - // Id columns are already prefixed with the table name, so we don't need to add it here - const name = "Id"; - const innerType = resolveType(c, d, { - ...config, - generateIdentifierType: undefined, - }); - return { - comment: [], - declarationType: "typeDeclaration", - exportAs: "named", - name, - typeDefinition: [innerType], - }; + const defaultResult = defaultGenerateIdentifierType(c, d, config); + // Remove the brand from the type definition + defaultResult.typeDefinition = [ + defaultResult.typeDefinition[0].split(" & ")[0], + ]; + return defaultResult; }, outputPath: "./src/models/generated", preDeleteOutputFolder: true, diff --git a/packages/sidetrack/migrations/2.ts b/packages/sidetrack/migrations/2.ts new file mode 100644 index 00000000..0ff56e1b --- /dev/null +++ b/packages/sidetrack/migrations/2.ts @@ -0,0 +1,32 @@ +import { MigrationBuilder } from "@sidetrack/pg-migrate"; + +export default { + up: async (pgm: MigrationBuilder) => { + pgm.sql(` + + CREATE TYPE sidetrack_cron_job_status_enum AS ENUM ( + 'active', + 'inactive' + ); + + CREATE TABLE IF NOT EXISTS sidetrack_cron_jobs ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + queue TEXT NOT NULL, + cron_expression TEXT NOT NULL, + payload JSONB NOT NULL, + status sidetrack_cron_job_status_enum NOT NULL DEFAULT 'active', + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + UNIQUE(queue, cron_expression) + ); + + ALTER TABLE sidetrack_jobs ADD COLUMN cron_job_id uuid REFERENCES sidetrack_cron_jobs(id); + + ALTER TABLE sidetrack_jobs ADD COLUMN unique_key TEXT; + + CREATE UNIQUE INDEX ON sidetrack_jobs (unique_key); + `); + }, + down: async (_pgm: MigrationBuilder) => {}, + name: "2", +}; diff --git a/packages/sidetrack/src/effect.ts b/packages/sidetrack/src/effect.ts index cd4c5996..5c3148f5 100644 --- a/packages/sidetrack/src/effect.ts +++ b/packages/sidetrack/src/effect.ts @@ -6,6 +6,8 @@ import * as Layer from "effect/Layer"; import { fromIterableWith } from "effect/Record"; import * as Ref from "effect/Ref"; import * as Schedule from "effect/Schedule"; +import * as Record from "effect/Record"; +import * as Stream from "effect/Stream"; import pg from "pg"; import { SidetrackDatabaseClient, usePg } from "./client"; @@ -25,7 +27,12 @@ import { SidetrackQueuesGenericType, SidetrackRunJobOptions, SidetrackRunJobsOptions, + SidetrackCronJobOptions, + SidetrackDeactivateCronScheduleOptions, + SidetrackDeleteCronScheduleOptions, } from "./types"; +import { Cron } from "effect"; +import SidetrackCronJobs from "./models/generated/public/SidetrackCronJobs"; export interface SidetrackService { cancelJob: ( @@ -54,6 +61,40 @@ export interface SidetrackService { * Turn off polling */ stop: () => Effect.Effect; + /** + * Schedule a cron job on a queue + * @param queueName - The queue to schedule the cron job on + * @param cronExpression - A 5 part cron expression + */ + scheduleCron: ( + queueName: K, + cronExpression: string, + payload: Queues[K], + options?: SidetrackCronJobOptions, + ) => Effect.Effect; + + /** + * Deactivate a cron schedule. This prevents the cron schedule from creating new jobs. + * @param queueName - The queue to deactivate the cron job from + * @param cronExpression - The cron expression to deactivate + */ + deactivateCronSchedule: ( + queueName: K, + cronExpression: string, + options?: SidetrackDeactivateCronScheduleOptions, + ) => Effect.Effect; + + /** + * Delete a cron schedule. This removes the cron job from the database. + * @param queueName - The queue to delete the cron job from + * @param cronExpression - The cron expression to delete + */ + deleteCronSchedule: ( + queueName: K, + cronExpression: string, + options?: SidetrackDeleteCronScheduleOptions, + ) => Effect.Effect; + /** * Utilities meant to be used with tests only */ @@ -117,6 +158,11 @@ export function makeLayer( Fiber.void, ); + // TODO should we use a hashmap or supervisor? We'll need to convert this layer into an Effect + const cronFibers = Ref.unsafeMake( + Record.empty>(), + ); + const startPolling = () => Effect.promise(() => dbClient.execute( @@ -176,7 +222,9 @@ export function makeLayer( ); return Effect.promise(() => runMigrations(databaseOptions.connectionString), - ).pipe(Effect.flatMap(() => startPolling())); + ) + .pipe(Effect.flatMap(() => startPolling())) + .pipe(Effect.flatMap(() => startCronSchedules())); }; const cancelJob = (jobId: string, options?: SidetrackCancelJobOptions) => @@ -187,6 +235,7 @@ export function makeLayer( ), ).pipe(Effect.asVoid); + // TODO should we return the deleted job or the number of rows deleted? There is a difference between a job actually being deleted and a job not being found const deleteJob = (jobId: string, options?: SidetrackDeleteJobOptions) => Effect.promise(() => (options?.dbClient || dbClient).execute( @@ -198,6 +247,14 @@ export function makeLayer( const stop = () => Ref.get(pollingFiber) .pipe(Effect.flatMap((fiber) => Fiber.interrupt(fiber))) + .pipe( + Effect.flatMap(() => Ref.get(cronFibers)), + Effect.flatMap((fibers) => + Effect.forEach(Record.values(fibers), (fiber) => + Fiber.interrupt(fiber), + ), + ), + ) .pipe(Effect.asVoid); const runHandler = ( @@ -287,13 +344,21 @@ export function makeLayer( payload, current_attempt, max_attempts, - scheduled_at - ) VALUES ('scheduled', $1, $2, 0, $3, $4) RETURNING *`, + scheduled_at, + unique_key + ) VALUES ('scheduled', $1, $2, 0, $3, $4, $5) + ${ + options?.suppressDuplicateUniqueKeyErrors + ? "ON CONFLICT (unique_key) DO NOTHING" + : "" + } + RETURNING *`, [ queueName, payload, queues[queueName].options?.maxAttempts ?? 1, options?.scheduledAt ?? new Date(), + options?.uniqueKey, ], ), ).pipe(Effect.map((result) => result.rows[0])); @@ -306,6 +371,104 @@ export function makeLayer( ), ).pipe(Effect.map((result) => result.rows[0])); + const scheduleCron = ( + queueName: K, + cronExpression: string, + payload: Queues[K], + options?: SidetrackCronJobOptions, + ) => + Cron.parse(cronExpression) + .pipe( + Effect.flatMap((_cron) => + Effect.promise(() => + (options?.dbClient || dbClient).execute( + `INSERT INTO sidetrack_cron_jobs (queue, cron_expression, payload) + VALUES ($1, $2, $3) + ON CONFLICT (queue, cron_expression) DO UPDATE + SET payload = $3 RETURNING *`, + [queueName, cronExpression, payload], + ), + ), + ), + ) + + .pipe( + Effect.map((result) => result.rows[0]), + Effect.tap((cronJob) => startCronJob(cronJob, options)), + ); + + /** + * @internal + */ + const startCronSchedules = () => + Effect.promise(() => + dbClient.execute( + `SELECT * FROM sidetrack_cron_jobs`, + ), + ).pipe( + Effect.flatMap((result) => + Effect.forEach( + result.rows, + (cronJob) => startCronJob(cronJob, cronJob.payload as any), + { concurrency: "inherit" }, + ), + ), + ); + + /** + * @internal + */ + const startCronJob = ( + cronJob: SidetrackCronJobs, + options?: SidetrackCronJobOptions, + ) => { + // This grabs the interval within which the cron job is running, and uses that as a unique key so multiple cron jobs on the same schedule don't conflict + // alternatively, we could explore using cron.next and just using Effect.schedule instead of draining a stream + return Stream.fromSchedule(Schedule.cron(cronJob.cron_expression)).pipe( + Stream.mapEffect((value) => + insertJob(cronJob.queue, cronJob.payload as any, { + ...options, + uniqueKey: value.toString(), + suppressDuplicateUniqueKeyErrors: true, + }), + ), + Stream.catchAllCause(Effect.logError), + Stream.runDrain, + Effect.forkDaemon, + Effect.flatMap((fiber) => + Ref.update(cronFibers, (fibers) => + Record.set(fibers, cronJob.id, fiber), + ), + ), + ); + }; + + // TODO should we return the updated cron job or the number of rows updated? There is a difference between a cron job actually being updated and a cron job not being found + const deactivateCronSchedule = ( + queueName: K, + cronExpression: string, + options?: SidetrackDeactivateCronScheduleOptions, + ) => + Effect.promise(() => + (options?.dbClient || dbClient).execute( + `UPDATE sidetrack_cron_jobs SET status = 'inactive' WHERE queue = $1 AND cron_expression = $2`, + [queueName, cronExpression], + ), + ).pipe(Effect.asVoid); + + // TODO should we return the deleted cron job or the number of rows deleted? There is a difference between a cron job actually being deleted and a cron job not being found + const deleteCronSchedule = ( + queueName: K, + cronExpression: string, + options?: SidetrackDeleteCronScheduleOptions, + ) => + Effect.promise(() => + (options?.dbClient || dbClient).execute( + `DELETE FROM sidetrack_cron_jobs WHERE queue = $1 AND cron_expression = $2`, + [queueName, cronExpression], + ), + ).pipe(Effect.asVoid); + const runJob = (jobId: string, options?: SidetrackRunJobOptions) => Effect.promise(() => (options?.dbClient || dbClient).execute( @@ -432,12 +595,15 @@ export function makeLayer( insertJob, start, stop, + deactivateCronSchedule, + deleteCronSchedule, testUtils: { listJobStatuses, listJobs, runJob, - runJobs: runJobs, + runJobs, }, + scheduleCron, }; }); } diff --git a/packages/sidetrack/src/index.ts b/packages/sidetrack/src/index.ts index 8b7cde23..4bef43cd 100644 --- a/packages/sidetrack/src/index.ts +++ b/packages/sidetrack/src/index.ts @@ -12,6 +12,9 @@ import SidetrackJobStatusEnum from "./models/generated/public/SidetrackJobStatus import { makeAppRuntime } from "./runtime"; import { SidetrackCancelJobOptions, + SidetrackCronJobOptions, + SidetrackDeactivateCronScheduleOptions, + SidetrackDeleteCronScheduleOptions, SidetrackDeleteJobOptions, SidetrackGetJobOptions, SidetrackInsertJobOptions, @@ -22,6 +25,7 @@ import { SidetrackRunJobOptions, SidetrackRunJobsOptions, } from "./types"; +import SidetrackCronJobs from "./models/generated/public/SidetrackCronJobs"; /** * Main class that contains all the primary methods for interacting with Sidetrack @@ -89,6 +93,58 @@ export class Sidetrack { ); } + /** + * Schedule a cron job on a queue + * @param queueName - The queue to schedule the cron job on + * @param cronExpression - A 5 part cron expression + */ + async scheduleCron( + queueName: K, + cronExpression: string, + payload: Queues[K], + options?: SidetrackCronJobOptions, + ): Promise { + return this.customRunPromise( + Effect.flatMap(this.sidetrackService, (service) => + service.scheduleCron(queueName, cronExpression, payload, options), + ), + ); + } + + /** + * Deactivate a cron schedule. This prevents the cron schedule from creating new jobs. + * @param queueName - The queue to deactivate the cron job from + * @param cronExpression - The cron expression to deactivate + */ + async deactivateCronSchedule( + queueName: K, + cronExpression: string, + options?: SidetrackDeactivateCronScheduleOptions, + ) { + return this.customRunPromise( + Effect.flatMap(this.sidetrackService, (service) => + service.deactivateCronSchedule(queueName, cronExpression, options), + ), + ); + } + + /** + * Delete a cron schedule. This removes the cron job from the database. + * @param queueName - The queue to delete the cron job from + * @param cronExpression - The cron expression to delete + */ + async deleteCronSchedule( + queueName: K, + cronExpression: string, + options?: SidetrackDeleteCronScheduleOptions, + ) { + return this.customRunPromise( + Effect.flatMap(this.sidetrackService, (service) => + service.deleteCronSchedule(queueName, cronExpression, options), + ), + ); + } + /** * Automatically run migrations and start polling the DB for jobs */ diff --git a/packages/sidetrack/src/migrations.ts b/packages/sidetrack/src/migrations.ts index 61406b94..41ba1787 100644 --- a/packages/sidetrack/src/migrations.ts +++ b/packages/sidetrack/src/migrations.ts @@ -2,11 +2,13 @@ import pg_migrate from "@sidetrack/pg-migrate"; import one from "../migrations/1"; +import two from "../migrations/2"; + export const runMigrations = async (connectionString: string) => pg_migrate({ databaseUrl: connectionString, dir: "migrations", direction: "up", - migrations: [one], + migrations: [one, two], migrationsTable: "sidetrack_migrations", }).then(() => {}); diff --git a/packages/sidetrack/src/models/generated/public/SidetrackCronJobs.ts b/packages/sidetrack/src/models/generated/public/SidetrackCronJobs.ts new file mode 100644 index 00000000..82449ae6 --- /dev/null +++ b/packages/sidetrack/src/models/generated/public/SidetrackCronJobs.ts @@ -0,0 +1,53 @@ +// @generated +// This file is automatically generated by Kanel. Do not modify manually. + +/** Identifier type for public.sidetrack_cron_jobs */ +export type SidetrackCronJobsId = string; + +/** Represents the table public.sidetrack_cron_jobs */ +export default interface SidetrackCronJobs { + id: SidetrackCronJobsId; + + queue: string; + + cron_expression: string; + + payload: unknown; + + created_at: Date | null; + + updated_at: Date | null; +} + +/** Represents the initializer for the table public.sidetrack_cron_jobs */ +export interface SidetrackCronJobsInitializer { + /** Default value: gen_random_uuid() */ + id?: SidetrackCronJobsId; + + queue: string; + + cron_expression: string; + + payload: unknown; + + /** Default value: now() */ + created_at?: Date | null; + + /** Default value: now() */ + updated_at?: Date | null; +} + +/** Represents the mutator for the table public.sidetrack_cron_jobs */ +export interface SidetrackCronJobsMutator { + id?: SidetrackCronJobsId; + + queue?: string; + + cron_expression?: string; + + payload?: unknown; + + created_at?: Date | null; + + updated_at?: Date | null; +} diff --git a/packages/sidetrack/src/models/generated/public/SidetrackJobs.ts b/packages/sidetrack/src/models/generated/public/SidetrackJobs.ts index 26563c21..d3986666 100644 --- a/packages/sidetrack/src/models/generated/public/SidetrackJobs.ts +++ b/packages/sidetrack/src/models/generated/public/SidetrackJobs.ts @@ -1,13 +1,15 @@ // @generated // This file is automatically generated by Kanel. Do not modify manually. -import type SidetrackJobStatusEnum from './SidetrackJobStatusEnum'; +import type { default as SidetrackJobStatusEnum } from './SidetrackJobStatusEnum'; +import type { SidetrackCronJobsId } from './SidetrackCronJobs'; -export type Id = string; +/** Identifier type for public.sidetrack_jobs */ +export type SidetrackJobsId = string; /** Represents the table public.sidetrack_jobs */ export default interface SidetrackJobs { - id: Id; + id: SidetrackJobsId; status: SidetrackJobStatusEnum; @@ -32,12 +34,14 @@ export default interface SidetrackJobs { failed_at: Date | null; completed_at: Date | null; + + cron_job_id: SidetrackCronJobsId | null; } /** Represents the initializer for the table public.sidetrack_jobs */ export interface SidetrackJobsInitializer { /** Default value: gen_random_uuid() */ - id?: Id; + id?: SidetrackJobsId; status: SidetrackJobStatusEnum; @@ -64,11 +68,13 @@ export interface SidetrackJobsInitializer { failed_at?: Date | null; completed_at?: Date | null; + + cron_job_id?: SidetrackCronJobsId | null; } /** Represents the mutator for the table public.sidetrack_jobs */ export interface SidetrackJobsMutator { - id?: Id; + id?: SidetrackJobsId; status?: SidetrackJobStatusEnum; @@ -93,4 +99,6 @@ export interface SidetrackJobsMutator { failed_at?: Date | null; completed_at?: Date | null; + + cron_job_id?: SidetrackCronJobsId | null; } diff --git a/packages/sidetrack/src/models/generated/public/SidetrackMigrations.ts b/packages/sidetrack/src/models/generated/public/SidetrackMigrations.ts index faa22c66..729df35d 100644 --- a/packages/sidetrack/src/models/generated/public/SidetrackMigrations.ts +++ b/packages/sidetrack/src/models/generated/public/SidetrackMigrations.ts @@ -1,11 +1,12 @@ // @generated // This file is automatically generated by Kanel. Do not modify manually. -export type Id = number; +/** Identifier type for public.sidetrack_migrations */ +export type SidetrackMigrationsId = number; /** Represents the table public.sidetrack_migrations */ export default interface SidetrackMigrations { - id: Id; + id: SidetrackMigrationsId; name: string; @@ -15,7 +16,7 @@ export default interface SidetrackMigrations { /** Represents the initializer for the table public.sidetrack_migrations */ export interface SidetrackMigrationsInitializer { /** Default value: nextval('sidetrack_migrations_id_seq'::regclass) */ - id?: Id; + id?: SidetrackMigrationsId; name: string; @@ -24,7 +25,7 @@ export interface SidetrackMigrationsInitializer { /** Represents the mutator for the table public.sidetrack_migrations */ export interface SidetrackMigrationsMutator { - id?: Id; + id?: SidetrackMigrationsId; name?: string; diff --git a/packages/sidetrack/src/types.ts b/packages/sidetrack/src/types.ts index 239c648e..5c5c3f8a 100644 --- a/packages/sidetrack/src/types.ts +++ b/packages/sidetrack/src/types.ts @@ -6,6 +6,20 @@ import SidetrackJobs from "./models/generated/public/SidetrackJobs"; export interface SidetrackInsertJobOptions { dbClient?: SidetrackDatabaseClient; scheduledAt?: Date; + uniqueKey?: string; + suppressDuplicateUniqueKeyErrors?: boolean; +} + +export interface SidetrackCronJobOptions { + dbClient?: SidetrackDatabaseClient; +} + +export interface SidetrackDeactivateCronScheduleOptions { + dbClient?: SidetrackDatabaseClient; +} + +export interface SidetrackDeleteCronScheduleOptions { + dbClient?: SidetrackDatabaseClient; } export interface SidetrackCancelJobOptions { diff --git a/packages/sidetrack/test/index.test.ts b/packages/sidetrack/test/index.test.ts index 452b7522..5f62e69b 100644 --- a/packages/sidetrack/test/index.test.ts +++ b/packages/sidetrack/test/index.test.ts @@ -335,4 +335,40 @@ describe("jobs", () => { expect(jobsAfterRun[0].status).toBe("completed"); expect(jobsAfterRun[0].payload).toEqual({ message: "Future job" }); }); + + it("cron job functionality works", { timeout: 80000 }, async () => { + const sidetrack = new SidetrackTest<{ + cronTest: { message: string }; + }>({ + databaseOptions: { + connectionString: process.env["DATABASE_URL"]!, + }, + queues: { + cronTest: { + handler: async (payload) => { + return payload; + }, + }, + }, + }); + + // Schedule a cron job to run every minute + // TODO this cron will start immediately, which mostly defeats the purpose of the test + // TODO We need to do some sort of specific time-based test and time-travel + await sidetrack.scheduleCron("cronTest", "* * * * *", { + message: "Cron job test", + }); + + await sidetrack.runJobs({ queue: ["cronTest"] }); + + // Check if a job was inserted and completed + const jobs = await sidetrack.listJobs({ queue: ["cronTest"] }); + expect(jobs.length).toBeGreaterThanOrEqual(1); + + const completedJobs = jobs.filter((job) => job.status === "completed"); + expect(completedJobs.length).toBeGreaterThanOrEqual(1); + + const lastCompletedJob = completedJobs[completedJobs.length - 1]; + expect(lastCompletedJob.payload).toEqual({ message: "Cron job test" }); + }); }); diff --git a/typedoc.json b/typedoc.json index 38d9fed6..006d1bc1 100644 --- a/typedoc.json +++ b/typedoc.json @@ -4,5 +4,6 @@ "includeVersion": true, "name": "Sidetrack", "out": "docs", + "excludeInternal": true, "plugin": ["typedoc-github-theme"] }