Skip to content

Commit

Permalink
refactor: save chainKey instead of just id
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Dec 4, 2024
1 parent 5d3c4b5 commit 16a50a4
Show file tree
Hide file tree
Showing 21 changed files with 150 additions and 66 deletions.
12 changes: 6 additions & 6 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
self.redisConnection = redisConnection
self.redisClient = redisConnection.conn
self.commands = {
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-8.lua")),
"addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-6.lua")),
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-9.lua")),
"addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-7.lua")),
"addParentJob": self.redisClient.register_script(self.getScript("addParentJob-4.lua")),
"addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-8.lua")),
"addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-9.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-7.lua")),
"cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-3.lua")),
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")),
Expand Down Expand Up @@ -119,7 +119,7 @@ def addStandardJob(self, job: Job, timestamp: int, pipe = None):
Add a standard job to the queue
"""
keys = self.getKeys(['wait', 'paused', 'meta', 'id',
'completed', 'active', 'events', 'marker'])
'completed', 'active', 'events', 'marker', 'waiting-children'])
args = self.addJobArgs(job, None)
args.append(timestamp)

Expand All @@ -130,7 +130,7 @@ def addDelayedJob(self, job: Job, timestamp: int, pipe = None):
Add a delayed job to the queue
"""
keys = self.getKeys(['marker', 'meta', 'id',
'delayed', 'completed', 'events'])
'delayed', 'completed', 'events', 'waiting-children'])
args = self.addJobArgs(job, None)
args.append(timestamp)

Expand All @@ -141,7 +141,7 @@ def addPrioritizedJob(self, job: Job, timestamp: int, pipe = None):
Add a prioritized job to the queue
"""
keys = self.getKeys(['marker', 'meta', 'id',
'prioritized', 'completed', 'active', 'events', 'pc'])
'prioritized', 'completed', 'active', 'events', 'pc', 'waiting-children'])
args = self.addJobArgs(job, None)
args.append(timestamp)

Expand Down
15 changes: 9 additions & 6 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
removeUndefinedFields,
optsAsJSON,
optsFromJSON,
getChainKey,
} from '../utils';
import { Backoffs } from './backoffs';
import { Scripts, raw2NextJobData } from './scripts';
Expand Down Expand Up @@ -149,9 +150,9 @@ export class Job<
deduplicationId?: string;

/**
* Chain identifier.
* Chain key.
*/
chainId?: string;
chainKey?: string;

/**
* Base repeat job key.
Expand Down Expand Up @@ -222,7 +223,9 @@ export class Job<
? opts.deduplication.id
: this.debounceId;

this.chainId = opts.chainId;
this.chainKey = opts.chain
? getChainKey(opts.chain, queue.qualifiedName)
: undefined;

this.toKey = queue.toKey.bind(queue);
this.setScripts();
Expand Down Expand Up @@ -347,8 +350,8 @@ export class Job<
job.repeatJobKey = json.rjk;
}

if (json.cid) {
job.chainId = json.cid;
if (json.chkey) {
job.chainKey = json.chkey;
}

if (json.deid) {
Expand Down Expand Up @@ -458,7 +461,7 @@ export class Job<
timestamp: this.timestamp,
failedReason: JSON.stringify(this.failedReason),
stacktrace: JSON.stringify(this.stacktrace),
chainId: this.chainId,
chainKey: this.chainKey,
debounceId: this.debounceId,
deduplicationId: this.deduplicationId,
repeatJobKey: this.repeatJobKey,
Expand Down
3 changes: 1 addition & 2 deletions src/classes/queue-keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ export class QueueKeys {
'pc', // priority counter key
'marker', // marker key
'de', // deduplication key
'ch', // chains key
].forEach(key => {
keys[key] = this.toKey(name, key);
});

keys['chains'] = `${this.prefix}:chains`;

return keys;
}

Expand Down
2 changes: 1 addition & 1 deletion src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ export class Queue<
async removeChainKey(id: string): Promise<number> {
const client = await this.client;

return client.del(`${this.keys.chains}:${id}`);
return client.del(`${this.keys.ch}:${id}`);
}

/**
Expand Down
3 changes: 1 addition & 2 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ export class Scripts {
parent,
job.repeatJobKey,
job.deduplicationId ? `${queueKeys.de}:${job.deduplicationId}` : null,
job.chainId ? `${queueKeys.chains}:${job.chainId}` : null,
job.chainKey ? job.chainKey : null,
];

let encodedOpts;
Expand Down Expand Up @@ -564,7 +564,6 @@ export class Scripts {
keys[11] = this.queue.toKey(job.id ?? '');
keys[12] = metricsKey;
keys[13] = this.queue.keys.marker;
keys[14] = this.queue.keys.chains;

const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs);

Expand Down
2 changes: 1 addition & 1 deletion src/commands/addDelayedJob-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ end
-- Store the job.
local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
opts, timestamp, parentKey, parentData,
repeatJobKey)
repeatJobKey, chainKey)

local lastJobKeyInChain = upsertChainKeyIfNeeded(chainKey, jobIdKey)

Expand Down
2 changes: 1 addition & 1 deletion src/commands/addPrioritizedJob-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ end
-- Store the job.
local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
opts, timestamp, parentKey, parentData,
repeatJobKey)
repeatJobKey, chainKey)

local lastJobKeyInChain = upsertChainKeyIfNeeded(chainKey, jobIdKey)

Expand Down
6 changes: 3 additions & 3 deletions src/commands/addStandardJob-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ local parentData
--- @include "includes/upsertChainKeyIfNeeded"

if parentKey ~= nil then
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
if rcall("EXISTS", parentKey) ~= 1 then return -5 end

parentData = cjson.encode(parent)
parentData = cjson.encode(parent)
end

local jobCounter = rcall("INCR", KEYS[4])
Expand Down Expand Up @@ -107,7 +107,7 @@ end

-- Store the job.
storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
parentKey, parentData, repeatJobKey)
parentKey, parentData, repeatJobKey, chainKey)

local lastJobKeyInChain = upsertChainKeyIfNeeded(chainKey, jobIdKey)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey,
local jobAttributes = rcall("HMGET", parentKey, "parent", "deid", "opts", "cid")

removeDeduplicationKeyIfNeeded(parentQueuePrefix, jobAttributes[2])
removeChainKeyIfNeeded(parentQueuePrefix, jobAttributes[4], parentKey)
removeChainKeyIfNeeded(jobAttributes[4], parentKey)

if jobAttributes[1] then
local parentData = cjson.decode(jobAttributes[1])
Expand Down
7 changes: 3 additions & 4 deletions src/commands/includes/removeChainKeyIfNeeded.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
Function to remove chain key if needed.
]]

local function removeChainKeyIfNeeded(chainsKey, chainId, jobKey)
if chainId then
local chainKey = chainsKey .. ":" .. chainId
local function removeChainKeyIfNeeded(chainKey, jobKey)
if chainKey then
local lastJobKeyInChain = rcall("GET", chainKey)

if lastJobKeyInChain == jobKey then
rcall("DEL", chainKey)
end
end
end
end
2 changes: 1 addition & 1 deletion src/commands/includes/removeJob.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)
if shouldRemoveDeduplicationKey then
removeDeduplicationKey(baseKey, jobKey)
end
removeChainKeyIfNeeded(baseKey, rcall("HGET", jobKey, "cid"), jobKey)
removeChainKeyIfNeeded(rcall("HGET", jobKey, "cid"), jobKey)
removeJobKeys(jobKey)
end
8 changes: 4 additions & 4 deletions src/commands/includes/storeJob.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Function to store a job
]]
local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
parentKey, parentData, repeatJobKey)
parentKey, parentData, repeatJobKey, chainKey)
local jsonOpts = cjson.encode(opts)
local delay = opts['delay'] or 0
local priority = opts['priority'] or 0
Expand All @@ -26,9 +26,9 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
table.insert(optionalValues, debounceId)
end

if opts['cid'] then
table.insert(optionalValues, "cid")
table.insert(optionalValues, opts['cid'])
if chainKey then
table.insert(optionalValues, "chkey")
table.insert(optionalValues, chainKey)
end

rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
Expand Down
2 changes: 1 addition & 1 deletion src/commands/moveStalledJobsToWait-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ if (#stalling > 0) then
local opts = cjson.decode(rawOpts)
rcall("ZADD", failedKey, timestamp, jobId)
removeDeduplicationKeyIfNeeded(queueKeyPrefix, jobAttributes[3])
removeChainKeyIfNeeded(queueKeyPrefix, jobAttributes[4], jobKey)
removeChainKeyIfNeeded(jobAttributes[4], jobKey)

local failedReason =
"job stalled more than allowable limit"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
KEYS[12] jobId key
KEYS[13] metrics key
KEYS[14] marker key
KEYS[15] chains key
ARGV[1] jobId
ARGV[2] timestamp
Expand Down Expand Up @@ -96,7 +95,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
return -4
end

local jobAttributes = rcall("HMGET", jobIdKey, "parentKey", "parent", "deid", "cid")
local jobAttributes = rcall("HMGET", jobIdKey, "parentKey", "parent", "deid", "chkey")
local parentKey = jobAttributes[1] or ""
local parentId = ""
local parentQueueKey = ""
Expand All @@ -122,7 +121,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
local prefix = ARGV[7]

removeDeduplicationKeyIfNeeded(prefix, jobAttributes[3])
removeChainKeyIfNeeded(KEYS[15], jobAttributes[4], jobIdKey)
removeChainKeyIfNeeded(jobAttributes[4], jobIdKey)

-- If job has a parent we need to
-- 1) remove this job id from parents dependencies
Expand Down
2 changes: 1 addition & 1 deletion src/commands/removeJob-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ local function removeJob(prefix, jobId, parentKey, removeChildren)
local prev = removeJobFromAnyState(prefix, jobId)

removeDeduplicationKey(prefix, jobKey)
removeChainnKeyIfNeeded(prefix, rcall("HGET", jobKey, "cid"), jobKey)
removeChainKeyIfNeeded(rcall("HGET", jobKey, "cid"), jobKey)
if removeJobKeys(jobKey) > 0 then
local maxEvents = getOrSetMaxEvents(KEYS[2])
rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed", "jobId", jobId, "prev",
Expand Down
9 changes: 9 additions & 0 deletions src/interfaces/chain-options.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export interface ChainOptions {
id: string;

/**
* It includes the prefix, the namespace separator :, and queue name.
* @see https://www.gnu.org/software/gawk/manual/html_node/Qualified-Names.html
*/
queueKey?: string;
}
1 change: 1 addition & 0 deletions src/interfaces/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export * from './advanced-options';
export * from './backoff-options';
export * from './base-job-options';
export * from './chain-options';
export * from './child-message';
export * from './connection';
export * from './debounce-options';
Expand Down
4 changes: 2 additions & 2 deletions src/interfaces/job-json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export interface JobJson {
repeatJobKey?: string;
debounceId?: string;
deduplicationId?: string;
chainId?: string;
chainKey?: string;
processedBy?: string;
}

Expand All @@ -40,7 +40,7 @@ export interface JobJsonRaw {
returnvalue: string;
parentKey?: string;
parent?: string;
cid?: string;
chkey?: string;
deid?: string;
rjk?: string;
atm?: string;
Expand Down
11 changes: 8 additions & 3 deletions src/types/job-options.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { BaseJobOptions, DebounceOptions } from '../interfaces';
import { BaseJobOptions, ChainOptions, DebounceOptions } from '../interfaces';

export type JobsOptions = BaseJobOptions & {
/**
* Chain identifier to add a new dependency on a flow chain identified by this option.
* Chain options to add a new dependency on a flow chain.
*/
chainId?: string;
chain?: ChainOptions;

/**
* Debounce options.
Expand Down Expand Up @@ -37,6 +37,11 @@ export type JobsOptions = BaseJobOptions & {
* These fields are the ones stored in Redis with smaller keys for compactness.
*/
export type RedisJobOptions = BaseJobOptions & {
/**
* Debounce identifier.
*/
ch?: ChainOptions;

/**
* Debounce identifier.
*/
Expand Down
14 changes: 13 additions & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { AbortController } from 'node-abort-controller';
// @ts-ignore
import { CONNECTION_CLOSED_ERROR_MSG } from 'ioredis/built/utils';
import {
ChainOptions,
ChildMessage,
ContextManager,
RedisClient,
Expand Down Expand Up @@ -155,6 +156,17 @@ export function getParentKey(opts: {
}
}

export function getChainKey(
opts: ChainOptions,
queueQualifiedName: string,
): string | undefined {
if (opts) {
return `${opts.queueKey ? opts.queueKey : queueQualifiedName}:ch:${
opts.id
}`;
}
}

export const clientCommandMessageReg =
/ERR unknown command ['`]\s*client\s*['`]/;

Expand Down Expand Up @@ -272,7 +284,7 @@ export const toString = (value: any): string => {
export const QUEUE_EVENT_SUFFIX = ':qe';

const optsDecodeMap = {
cid: 'chainId',
ch: 'chain',
de: 'deduplication',
fpof: 'failParentOnFailure',
idof: 'ignoreDependencyOnFailure',
Expand Down
Loading

0 comments on commit 16a50a4

Please sign in to comment.