Skip to content

Commit

Permalink
Merge pull request #204 from sidetracklabs/initial-cron-jobs-support
Browse files Browse the repository at this point in the history
feat(sidetrack): cron jobs
  • Loading branch information
aniravi24 authored Oct 11, 2024
2 parents dd503d1 + d8df0c0 commit 19b66f5
Show file tree
Hide file tree
Showing 11 changed files with 390 additions and 30 deletions.
23 changes: 7 additions & 16 deletions packages/sidetrack/.kanelrc.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
const path = require("path");
const { resolveType } = require("kanel");

const { defaultGenerateIdentifierType } = require("kanel");
/** @type {import('kanel').Config} */
module.exports = {
connection: { connectionString: process.env.DATABASE_URL },
generateIdentifierType: (c, d, config) => {
// Id columns are already prefixed with the table name, so we don't need to add it here
const name = "Id";
const innerType = resolveType(c, d, {
...config,
generateIdentifierType: undefined,
});
return {
comment: [],
declarationType: "typeDeclaration",
exportAs: "named",
name,
typeDefinition: [innerType],
};
const defaultResult = defaultGenerateIdentifierType(c, d, config);
// Remove the brand from the type definition
defaultResult.typeDefinition = [
defaultResult.typeDefinition[0].split(" & ")[0],
];
return defaultResult;
},
outputPath: "./src/models/generated",
preDeleteOutputFolder: true,
Expand Down
32 changes: 32 additions & 0 deletions packages/sidetrack/migrations/2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { MigrationBuilder } from "@sidetrack/pg-migrate";

export default {
up: async (pgm: MigrationBuilder) => {
pgm.sql(`
CREATE TYPE sidetrack_cron_job_status_enum AS ENUM (
'active',
'inactive'
);
CREATE TABLE IF NOT EXISTS sidetrack_cron_jobs (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
queue TEXT NOT NULL,
cron_expression TEXT NOT NULL,
payload JSONB NOT NULL,
status sidetrack_cron_job_status_enum NOT NULL DEFAULT 'active',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(queue, cron_expression)
);
ALTER TABLE sidetrack_jobs ADD COLUMN cron_job_id uuid REFERENCES sidetrack_cron_jobs(id);
ALTER TABLE sidetrack_jobs ADD COLUMN unique_key TEXT;
CREATE UNIQUE INDEX ON sidetrack_jobs (unique_key);
`);
},
down: async (_pgm: MigrationBuilder) => {},
name: "2",
};
174 changes: 170 additions & 4 deletions packages/sidetrack/src/effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import * as Layer from "effect/Layer";
import { fromIterableWith } from "effect/Record";
import * as Ref from "effect/Ref";
import * as Schedule from "effect/Schedule";
import * as Record from "effect/Record";
import * as Stream from "effect/Stream";
import pg from "pg";

import { SidetrackDatabaseClient, usePg } from "./client";
Expand All @@ -25,7 +27,12 @@ import {
SidetrackQueuesGenericType,
SidetrackRunJobOptions,
SidetrackRunJobsOptions,
SidetrackCronJobOptions,
SidetrackDeactivateCronScheduleOptions,
SidetrackDeleteCronScheduleOptions,
} from "./types";
import { Cron } from "effect";
import SidetrackCronJobs from "./models/generated/public/SidetrackCronJobs";

export interface SidetrackService<Queues extends SidetrackQueuesGenericType> {
cancelJob: (
Expand Down Expand Up @@ -54,6 +61,40 @@ export interface SidetrackService<Queues extends SidetrackQueuesGenericType> {
* Turn off polling
*/
stop: () => Effect.Effect<void>;
/**
* Schedule a cron job on a queue
* @param queueName - The queue to schedule the cron job on
* @param cronExpression - A 5 part cron expression
*/
scheduleCron: <K extends keyof Queues>(
queueName: K,
cronExpression: string,
payload: Queues[K],
options?: SidetrackCronJobOptions,
) => Effect.Effect<SidetrackCronJobs, Cron.ParseError>;

/**
* Deactivate a cron schedule. This prevents the cron schedule from creating new jobs.
* @param queueName - The queue to deactivate the cron job from
* @param cronExpression - The cron expression to deactivate
*/
deactivateCronSchedule: <K extends keyof Queues>(
queueName: K,
cronExpression: string,
options?: SidetrackDeactivateCronScheduleOptions,
) => Effect.Effect<void>;

/**
* Delete a cron schedule. This removes the cron job from the database.
* @param queueName - The queue to delete the cron job from
* @param cronExpression - The cron expression to delete
*/
deleteCronSchedule: <K extends keyof Queues>(
queueName: K,
cronExpression: string,
options?: SidetrackDeleteCronScheduleOptions,
) => Effect.Effect<void>;

/**
* Utilities meant to be used with tests only
*/
Expand Down Expand Up @@ -117,6 +158,11 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
Fiber.void,
);

// TODO should we use a hashmap or supervisor? We'll need to convert this layer into an Effect
const cronFibers = Ref.unsafeMake(
Record.empty<string, Fiber.Fiber<unknown, unknown>>(),
);

const startPolling = () =>
Effect.promise(() =>
dbClient.execute<SidetrackJobs>(
Expand Down Expand Up @@ -176,7 +222,9 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
);
return Effect.promise(() =>
runMigrations(databaseOptions.connectionString),
).pipe(Effect.flatMap(() => startPolling()));
)
.pipe(Effect.flatMap(() => startPolling()))
.pipe(Effect.flatMap(() => startCronSchedules()));
};

const cancelJob = (jobId: string, options?: SidetrackCancelJobOptions) =>
Expand All @@ -187,6 +235,7 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
),
).pipe(Effect.asVoid);

// TODO should we return the deleted job or the number of rows deleted? There is a difference between a job actually being deleted and a job not being found
const deleteJob = (jobId: string, options?: SidetrackDeleteJobOptions) =>
Effect.promise(() =>
(options?.dbClient || dbClient).execute(
Expand All @@ -198,6 +247,14 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
const stop = () =>
Ref.get(pollingFiber)
.pipe(Effect.flatMap((fiber) => Fiber.interrupt(fiber)))
.pipe(
Effect.flatMap(() => Ref.get(cronFibers)),
Effect.flatMap((fibers) =>
Effect.forEach(Record.values(fibers), (fiber) =>
Fiber.interrupt(fiber),
),
),
)
.pipe(Effect.asVoid);

const runHandler = (
Expand Down Expand Up @@ -287,13 +344,21 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
payload,
current_attempt,
max_attempts,
scheduled_at
) VALUES ('scheduled', $1, $2, 0, $3, $4) RETURNING *`,
scheduled_at,
unique_key
) VALUES ('scheduled', $1, $2, 0, $3, $4, $5)
${
options?.suppressDuplicateUniqueKeyErrors
? "ON CONFLICT (unique_key) DO NOTHING"
: ""
}
RETURNING *`,
[
queueName,
payload,
queues[queueName].options?.maxAttempts ?? 1,
options?.scheduledAt ?? new Date(),
options?.uniqueKey,
],
),
).pipe(Effect.map((result) => result.rows[0]));
Expand All @@ -306,6 +371,104 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
),
).pipe(Effect.map((result) => result.rows[0]));

const scheduleCron = <K extends keyof Queues>(
queueName: K,
cronExpression: string,
payload: Queues[K],
options?: SidetrackCronJobOptions,
) =>
Cron.parse(cronExpression)
.pipe(
Effect.flatMap((_cron) =>
Effect.promise(() =>
(options?.dbClient || dbClient).execute<SidetrackCronJobs>(
`INSERT INTO sidetrack_cron_jobs (queue, cron_expression, payload)
VALUES ($1, $2, $3)
ON CONFLICT (queue, cron_expression) DO UPDATE
SET payload = $3 RETURNING *`,
[queueName, cronExpression, payload],
),
),
),
)

.pipe(
Effect.map((result) => result.rows[0]),
Effect.tap((cronJob) => startCronJob(cronJob, options)),
);

/**
* @internal
*/
const startCronSchedules = () =>
Effect.promise(() =>
dbClient.execute<SidetrackCronJobs>(
`SELECT * FROM sidetrack_cron_jobs`,
),
).pipe(
Effect.flatMap((result) =>
Effect.forEach(
result.rows,
(cronJob) => startCronJob(cronJob, cronJob.payload as any),
{ concurrency: "inherit" },
),
),
);

/**
* @internal
*/
const startCronJob = (
cronJob: SidetrackCronJobs,
options?: SidetrackCronJobOptions,
) => {
// This grabs the interval within which the cron job is running, and uses that as a unique key so multiple cron jobs on the same schedule don't conflict
// alternatively, we could explore using cron.next and just using Effect.schedule instead of draining a stream
return Stream.fromSchedule(Schedule.cron(cronJob.cron_expression)).pipe(
Stream.mapEffect((value) =>
insertJob(cronJob.queue, cronJob.payload as any, {
...options,
uniqueKey: value.toString(),
suppressDuplicateUniqueKeyErrors: true,
}),
),
Stream.catchAllCause(Effect.logError),
Stream.runDrain,
Effect.forkDaemon,
Effect.flatMap((fiber) =>
Ref.update(cronFibers, (fibers) =>
Record.set(fibers, cronJob.id, fiber),
),
),
);
};

// TODO should we return the updated cron job or the number of rows updated? There is a difference between a cron job actually being updated and a cron job not being found
const deactivateCronSchedule = <K extends keyof Queues>(
queueName: K,
cronExpression: string,
options?: SidetrackDeactivateCronScheduleOptions,
) =>
Effect.promise(() =>
(options?.dbClient || dbClient).execute(
`UPDATE sidetrack_cron_jobs SET status = 'inactive' WHERE queue = $1 AND cron_expression = $2`,
[queueName, cronExpression],
),
).pipe(Effect.asVoid);

// TODO should we return the deleted cron job or the number of rows deleted? There is a difference between a cron job actually being deleted and a cron job not being found
const deleteCronSchedule = <K extends keyof Queues>(
queueName: K,
cronExpression: string,
options?: SidetrackDeleteCronScheduleOptions,
) =>
Effect.promise(() =>
(options?.dbClient || dbClient).execute(
`DELETE FROM sidetrack_cron_jobs WHERE queue = $1 AND cron_expression = $2`,
[queueName, cronExpression],
),
).pipe(Effect.asVoid);

const runJob = (jobId: string, options?: SidetrackRunJobOptions) =>
Effect.promise(() =>
(options?.dbClient || dbClient).execute<SidetrackJobs>(
Expand Down Expand Up @@ -432,12 +595,15 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
insertJob,
start,
stop,
deactivateCronSchedule,
deleteCronSchedule,
testUtils: {
listJobStatuses,
listJobs,
runJob,
runJobs: runJobs,
runJobs,
},
scheduleCron,
};
});
}
56 changes: 56 additions & 0 deletions packages/sidetrack/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import SidetrackJobStatusEnum from "./models/generated/public/SidetrackJobStatus
import { makeAppRuntime } from "./runtime";
import {
SidetrackCancelJobOptions,
SidetrackCronJobOptions,
SidetrackDeactivateCronScheduleOptions,
SidetrackDeleteCronScheduleOptions,
SidetrackDeleteJobOptions,
SidetrackGetJobOptions,
SidetrackInsertJobOptions,
Expand All @@ -22,6 +25,7 @@ import {
SidetrackRunJobOptions,
SidetrackRunJobsOptions,
} from "./types";
import SidetrackCronJobs from "./models/generated/public/SidetrackCronJobs";

/**
* Main class that contains all the primary methods for interacting with Sidetrack
Expand Down Expand Up @@ -89,6 +93,58 @@ export class Sidetrack<Queues extends SidetrackQueuesGenericType> {
);
}

/**
* Schedule a cron job on a queue
* @param queueName - The queue to schedule the cron job on
* @param cronExpression - A 5 part cron expression
*/
async scheduleCron<K extends keyof Queues>(
queueName: K,
cronExpression: string,
payload: Queues[K],
options?: SidetrackCronJobOptions,
): Promise<SidetrackCronJobs> {
return this.customRunPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.scheduleCron(queueName, cronExpression, payload, options),
),
);
}

/**
* Deactivate a cron schedule. This prevents the cron schedule from creating new jobs.
* @param queueName - The queue to deactivate the cron job from
* @param cronExpression - The cron expression to deactivate
*/
async deactivateCronSchedule<K extends keyof Queues>(
queueName: K,
cronExpression: string,
options?: SidetrackDeactivateCronScheduleOptions,
) {
return this.customRunPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.deactivateCronSchedule(queueName, cronExpression, options),
),
);
}

/**
* Delete a cron schedule. This removes the cron job from the database.
* @param queueName - The queue to delete the cron job from
* @param cronExpression - The cron expression to delete
*/
async deleteCronSchedule<K extends keyof Queues>(
queueName: K,
cronExpression: string,
options?: SidetrackDeleteCronScheduleOptions,
) {
return this.customRunPromise(
Effect.flatMap(this.sidetrackService, (service) =>
service.deleteCronSchedule(queueName, cronExpression, options),
),
);
}

/**
* Automatically run migrations and start polling the DB for jobs
*/
Expand Down
Loading

0 comments on commit 19b66f5

Please sign in to comment.