Skip to content

Commit

Permalink
Merge pull request #250 from sidetracklabs/cron-tz-support
Browse files Browse the repository at this point in the history
feat(sidetrack): timezone cron support
  • Loading branch information
aniravi24 authored Jan 4, 2025
2 parents d90457b + 22dd718 commit 686ce13
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 21 deletions.
1 change: 1 addition & 0 deletions packages/sidetrack/migrations/2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export default {
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
queue TEXT NOT NULL,
cron_expression TEXT NOT NULL,
timezone TEXT,
payload JSONB NOT NULL,
status sidetrack_cron_job_status_enum NOT NULL DEFAULT 'active',
inserted_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
Expand Down
51 changes: 33 additions & 18 deletions packages/sidetrack/src/effect.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Cron } from "effect";
import * as Context from "effect/Context";
import * as DateTime from "effect/DateTime";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Fiber from "effect/Fiber";
Expand Down Expand Up @@ -428,15 +429,20 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
payload: Queues[K],
options?: SidetrackCronJobOptions,
) =>
Cron.parse(cronExpression).pipe(
Cron.parse(
cronExpression,
options?.timezone
? DateTime.zoneUnsafeMakeNamed(options.timezone)
: undefined,
).pipe(
Effect.flatMap((_cron) =>
Effect.promise(() =>
(options?.dbClient || dbClient).execute<SidetrackCronJobs>(
`INSERT INTO sidetrack_cron_jobs (queue, cron_expression, payload)
VALUES ($1, $2, $3)
`INSERT INTO sidetrack_cron_jobs (queue, cron_expression, payload, timezone)
VALUES ($1, $2, $3, $4)
ON CONFLICT (queue, cron_expression) DO UPDATE
SET payload = $3 RETURNING *`,
[queueName, cronExpression, payload],
[queueName, cronExpression, payload, options?.timezone],
),
),
),
Expand Down Expand Up @@ -469,20 +475,29 @@ export function makeLayer<Queues extends SidetrackQueuesGenericType>(
) => {
// This grabs the interval within which the cron job is running, and uses that as a unique key so multiple cron jobs on the same schedule don't conflict
// alternatively, we could explore using cron.next and just using Effect.schedule instead of draining a stream
return Stream.fromSchedule(Schedule.cron(cronJob.cron_expression)).pipe(
Stream.mapEffect((value) =>
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any
insertJob(cronJob.queue, cronJob.payload as any, {
...options,
suppressDuplicateUniqueKeyErrors: true,
uniqueKey: value.toString(),
}),
),
Stream.catchAllCause(Effect.logError),
Stream.runDrain,
Effect.supervised(cronSupervisor),
Effect.fork,
);
return Cron.parse(
cronJob.cron_expression,
cronJob.timezone
? DateTime.zoneUnsafeMakeNamed(cronJob.timezone)
: undefined,
)
.pipe(
Stream.flatMap((cron) => Stream.fromSchedule(Schedule.cron(cron))),
)
.pipe(
Stream.mapEffect((value) =>
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any
insertJob(cronJob.queue, cronJob.payload as any, {
...options,
suppressDuplicateUniqueKeyErrors: true,
uniqueKey: value.toString(),
}),
),
Stream.catchAllCause(Effect.logError),
Stream.runDrain,
Effect.supervised(cronSupervisor),
Effect.fork,
);
};

// TODO should we return the updated cron job or the number of rows updated? There is a difference between a cron job actually being updated and a cron job not being found
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// @generated
// This file is automatically generated by Kanel. Do not modify manually.

/** Represents the enum public.sidetrack_cron_job_status_enum */
enum SidetrackCronJobStatusEnum {
active = 'active',
inactive = 'inactive',
};

export default SidetrackCronJobStatusEnum;
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// @generated
// This file is automatically generated by Kanel. Do not modify manually.

import type { default as SidetrackCronJobStatusEnum } from './SidetrackCronJobStatusEnum';

/** Identifier type for public.sidetrack_cron_jobs */
export type SidetrackCronJobsId = string;

Expand All @@ -12,9 +14,13 @@ export default interface SidetrackCronJobs {

cron_expression: string;

timezone: string | null;

payload: unknown;

created_at: Date | null;
status: SidetrackCronJobStatusEnum;

inserted_at: Date | null;

updated_at: Date | null;
}
Expand All @@ -28,10 +34,15 @@ export interface SidetrackCronJobsInitializer {

cron_expression: string;

timezone?: string | null;

payload: unknown;

/** Default value: 'active'::sidetrack_cron_job_status_enum */
status?: SidetrackCronJobStatusEnum;

/** Default value: now() */
created_at?: Date | null;
inserted_at?: Date | null;

/** Default value: now() */
updated_at?: Date | null;
Expand All @@ -45,9 +56,13 @@ export interface SidetrackCronJobsMutator {

cron_expression?: string;

timezone?: string | null;

payload?: unknown;

created_at?: Date | null;
status?: SidetrackCronJobStatusEnum;

inserted_at?: Date | null;

updated_at?: Date | null;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export default interface SidetrackJobs {
completed_at: Date | null;

cron_job_id: SidetrackCronJobsId | null;

unique_key: string | null;
}

/** Represents the initializer for the table public.sidetrack_jobs */
Expand Down Expand Up @@ -70,6 +72,8 @@ export interface SidetrackJobsInitializer {
completed_at?: Date | null;

cron_job_id?: SidetrackCronJobsId | null;

unique_key?: string | null;
}

/** Represents the mutator for the table public.sidetrack_jobs */
Expand Down Expand Up @@ -101,4 +105,6 @@ export interface SidetrackJobsMutator {
completed_at?: Date | null;

cron_job_id?: SidetrackCronJobsId | null;

unique_key?: string | null;
}
1 change: 1 addition & 0 deletions packages/sidetrack/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface SidetrackInsertJobOptions {

export interface SidetrackCronJobOptions {
dbClient?: SidetrackDatabaseClient;
timezone?: string;
}

export interface SidetrackDeactivateCronScheduleOptions {
Expand Down

0 comments on commit 686ce13

Please sign in to comment.