-
Notifications
You must be signed in to change notification settings - Fork 437
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: keep jobs in waiting list when queue is paused #2769
Changes from 35 commits
481b664
2654bb8
9286d68
1f2f889
a66ddba
87447a8
c3acbb9
e973ec5
65c3beb
7a93835
0cde0c3
537ce42
e593d8a
efe1a56
42ebfcb
616e45d
a8103c0
f5f7426
db131b4
9a3067b
77b81ba
47408f3
67e9306
fcac031
4cd34a1
50b0313
9f97ec0
1f7245a
ee33a81
92d9154
268b9d4
6e20049
d0c4015
fd3ba17
82da69a
f1b85ab
852b41e
7514be9
e2e3c9d
e370b84
ac13902
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
--- | ||
description: Tips and hints on how to migrate to v6. | ||
--- | ||
|
||
# Migration to v6 | ||
|
||
Make sure to pass **skipMigrationsExecution** option in any of our instances as false in order to execute all necessary changes when coming from an older version | ||
|
||
## Migration of deprecated paused key | ||
|
||
If you have paused queues after upgrading to this version. These jobs will be moved to wait state when initializing any of our instances (Worker, Queue, QueueEvents or FlowProducer). | ||
|
||
Paused key is not longer needed as this state is already represented by queue meta key. It also improve the process of pausing or resuming a queue as we don't need to rename any key. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. by meta key -> inside the meta key There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "It also improves..." |
||
|
||
## Remove legacy markers | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we are going to need more detailed information on how to safely perform the migration. |
||
|
||
When migrating from versions before v5. | ||
It's recommended to do this process: | ||
|
||
1. Pause your queues. | ||
2. Upgrade to v6. | ||
3. Instantiate any instance where migrations will be executed. | ||
4. Resume your queues. | ||
|
||
This way you will prevent that your workers pick a legacy marker. | ||
|
||
A second option would be to do incremental upgrades. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,11 +38,11 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection | |
self.redisConnection = redisConnection | ||
self.redisClient = redisConnection.conn | ||
self.commands = { | ||
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-8.lua")), | ||
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-7.lua")), | ||
"addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-6.lua")), | ||
"addParentJob": self.redisClient.register_script(self.getScript("addParentJob-4.lua")), | ||
"addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-8.lua")), | ||
"changePriority": self.redisClient.register_script(self.getScript("changePriority-7.lua")), | ||
"changePriority": self.redisClient.register_script(self.getScript("changePriority-6.lua")), | ||
"cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-3.lua")), | ||
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")), | ||
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")), | ||
|
@@ -51,18 +51,19 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection | |
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")), | ||
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")), | ||
"isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")), | ||
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-9.lua")), | ||
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")), | ||
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")), | ||
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-10.lua")), | ||
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")), | ||
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")), | ||
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-13.lua")), | ||
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")), | ||
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")), | ||
"pause": self.redisClient.register_script(self.getScript("pause-7.lua")), | ||
"promote": self.redisClient.register_script(self.getScript("promote-9.lua")), | ||
"promote": self.redisClient.register_script(self.getScript("promote-8.lua")), | ||
"removeJob": self.redisClient.register_script(self.getScript("removeJob-2.lua")), | ||
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-8.lua")), | ||
"retryJob": self.redisClient.register_script(self.getScript("retryJob-11.lua")), | ||
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-8.lua")), | ||
"migrateDeprecatedPausedKey": self.redisClient.register_script(self.getScript("migrateDeprecatedPausedKey-2.lua")), | ||
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")), | ||
"retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")), | ||
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")), | ||
"saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")), | ||
"updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")), | ||
"updateProgress": self.redisClient.register_script(self.getScript("updateProgress-3.lua")), | ||
|
@@ -131,7 +132,7 @@ def addStandardJob(self, job: Job, timestamp: int, pipe = None): | |
""" | ||
Add a standard job to the queue | ||
""" | ||
keys = self.getKeys(['wait', 'paused', 'meta', 'id', | ||
keys = self.getKeys(['wait', 'meta', 'id', | ||
'completed', 'active', 'events', 'marker']) | ||
args = self.addJobArgs(job, None) | ||
args.append(timestamp) | ||
|
@@ -259,15 +260,15 @@ def saveStacktraceArgs(self, job_id: str, stacktrace: str, failedReason: str): | |
return (keys, args) | ||
|
||
def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}): | ||
keys = self.getKeys(['active', 'wait', 'paused']) | ||
keys = self.getKeys(['active', 'wait']) | ||
keys.append(self.toKey(job_id)) | ||
keys.append(self.keys['meta']) | ||
keys.append(self.keys['events']) | ||
keys.append(self.keys['delayed']) | ||
keys.append(self.keys['prioritized']) | ||
keys.append(self.keys['pc']) | ||
keys.append(self.keys['marker']) | ||
keys.append(self.keys['stalled']) | ||
keys.append(self.keys['marker']) | ||
|
||
push_cmd = "RPUSH" if lifo else "LPUSH" | ||
|
||
|
@@ -302,7 +303,6 @@ def promoteArgs(self, job_id: str): | |
keys = self.getKeys(['delayed', 'wait', 'paused', 'meta', 'prioritized', 'active', 'pc', 'events', 'marker']) | ||
keys.append(self.toKey(job_id)) | ||
keys.append(self.keys['events']) | ||
keys.append(self.keys['paused']) | ||
keys.append(self.keys['meta']) | ||
|
||
args = [self.keys[''], job_id] | ||
|
@@ -374,7 +374,6 @@ async def isJobInList(self, list_key: str, job_id: str): | |
|
||
async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False): | ||
keys = [self.keys['wait'], | ||
self.keys['paused'], | ||
self.keys['meta'], | ||
self.keys['prioritized'], | ||
self.keys['active'], | ||
|
@@ -408,7 +407,6 @@ async def reprocessJob(self, job: Job, state: str): | |
keys.append(self.keys[state]) | ||
keys.append(self.keys['wait']) | ||
keys.append(self.keys['meta']) | ||
keys.append(self.keys['paused']) | ||
keys.append(self.keys['active']) | ||
keys.append(self.keys['marker']) | ||
|
||
|
@@ -450,7 +448,7 @@ async def obliterate(self, count: int, force: bool = False): | |
|
||
def moveJobsToWaitArgs(self, state: str, count: int, timestamp: int) -> int: | ||
keys = self.getKeys( | ||
['', 'events', state, 'wait', 'paused', 'meta', 'active', 'marker']) | ||
['', 'events', state, 'wait', 'meta', 'active', 'marker']) | ||
|
||
args = [count or 1000, timestamp or round(time.time()*1000), state] | ||
return (keys, args) | ||
|
@@ -465,6 +463,15 @@ async def retryJobs(self, state: str, count: int, timestamp: int): | |
result = await self.commands["moveJobsToWait"](keys=keys, args=args) | ||
return result | ||
|
||
async def migrateDeprecatedPausedKey(self, maxCount: int): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need this? I think for migration users should use the migration script available in the NodeJs for now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But what I think we need in the python version is to throw an exception if you are trying to run this version on an older BullMQ version that has not yet been migrated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll add this logic in a following pr |
||
keys = self.getKeys( | ||
['paused', 'wait']) | ||
|
||
args = [maxCount] | ||
|
||
result = await self.commands["migrateDeprecatedPausedKey"](keys=keys, args=args) | ||
return result | ||
|
||
async def promoteJobs(self, count: int): | ||
""" | ||
Promote jobs in delayed state | ||
|
@@ -483,7 +490,7 @@ async def moveToActive(self, token: str, opts: dict) -> list[Any]: | |
limiter = opts.get("limiter", None) | ||
|
||
keys = self.getKeys(['wait', 'active', 'prioritized', 'events', | ||
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', 'marker']) | ||
'stalled', 'limiter', 'delayed', 'meta', 'pc', 'marker']) | ||
packedOpts = msgpack.packb( | ||
{"token": token, "lockDuration": lockDuration, "limiter": limiter}, use_bin_type=True) | ||
args = [self.keys[''], timestamp, packedOpts] | ||
|
@@ -516,7 +523,7 @@ def moveToFinishedArgs(self, job: Job, val: Any, propVal: str, shouldRemove, tar | |
metricsKey = self.toKey('metrics:' + target) | ||
|
||
keys = self.getKeys(['wait', 'active', 'prioritized', 'events', | ||
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', target]) | ||
'stalled', 'limiter', 'delayed', 'meta', 'pc', target]) | ||
keys.append(self.toKey(job.id)) | ||
keys.append(metricsKey) | ||
keys.append(self.keys['marker']) | ||
|
@@ -580,7 +587,7 @@ def extendLock(self, jobId: str, token: str, duration: int, client: Redis = None | |
|
||
def moveStalledJobsToWait(self, maxStalledCount: int, stalledInterval: int): | ||
keys = self.getKeys(['stalled', 'wait', 'active', 'failed', | ||
'stalled-check', 'meta', 'paused', 'marker', 'events']) | ||
'stalled-check', 'meta', 'marker', 'events']) | ||
args = [maxStalledCount, self.keys[''], round( | ||
time.time() * 1000), stalledInterval] | ||
return self.commands["moveStalledJobsToWait"](keys, args) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,10 +39,11 @@ const logger = debuglog('bull'); | |
|
||
const optsDecodeMap = { | ||
de: 'deduplication', | ||
ocf: 'onChildFailure', | ||
fpof: 'failParentOnFailure', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't we need to keep this old mappings to not cause a data breaking change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should introduce a "migration" mechanism here. Something like migration steps for going between versions that are run if required in Lua scripts for atomicity. But we would need to keep a version number in the meta key, adding complexity. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can keep this mappings that will be only used to format options of jobs with these old values, but no migration is needed as we are evaluation old and new option in lua scripts, new jobs will use new option |
||
idof: 'ignoreDependencyOnFailure', | ||
kl: 'keepLogs', | ||
ocf: 'onChildFailure', | ||
rdof: 'removeDependencyOnFailure', | ||
}; | ||
|
||
const optsEncodeMap = invertObject(optsDecodeMap); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
import { RedisClient } from '../interfaces'; | ||
|
||
export interface MigrationOptions { | ||
prefix: string; | ||
queueName: string; | ||
} | ||
|
||
export type MigrationFunction = ( | ||
client: RedisClient, | ||
opts: MigrationOptions, | ||
) => Promise<void>; | ||
|
||
export const checkPendingMigrations = async ( | ||
client: RedisClient, | ||
opts: MigrationOptions, | ||
) => { | ||
const migrationsKey = getRedisKeyFromOpts(opts, 'migrations'); | ||
const existingMigrations = await client.zrange(migrationsKey, 0, -1); | ||
return migrations.some( | ||
migration => | ||
!existingMigrations.includes(`${migration.version}-${migration.name}`), | ||
); | ||
}; | ||
|
||
export const migrations: { | ||
name: string; | ||
version: string; | ||
migrate: MigrationFunction; | ||
}[] = [ | ||
{ | ||
name: 'remove-legacy-markers', | ||
version: '6.0.0', | ||
migrate: async (client: RedisClient, opts: MigrationOptions) => { | ||
const keys: (string | number)[] = [ | ||
getRedisKeyFromOpts(opts, 'wait'), | ||
getRedisKeyFromOpts(opts, 'paused'), | ||
getRedisKeyFromOpts(opts, 'meta'), | ||
getRedisKeyFromOpts(opts, 'completed'), | ||
getRedisKeyFromOpts(opts, 'failed'), | ||
]; | ||
const args = [getRedisKeyFromOpts(opts, '')]; | ||
|
||
await (<any>client).removeLegacyMarkers(keys.concat(args)); | ||
}, | ||
}, | ||
{ | ||
name: 'migrate-paused-jobs', | ||
version: '6.0.0', | ||
migrate: async (client: RedisClient, opts: MigrationOptions) => { | ||
let cursor = 0; | ||
do { | ||
const keys: (string | number)[] = [ | ||
getRedisKeyFromOpts(opts, 'paused'), | ||
getRedisKeyFromOpts(opts, 'wait'), | ||
]; | ||
const args = [1000]; | ||
cursor = await (<any>client).migrateDeprecatedPausedKey( | ||
keys.concat(args), | ||
); | ||
} while (cursor); | ||
}, | ||
}, | ||
]; | ||
|
||
/** | ||
* Run Migrations. | ||
* | ||
* This method is used to run possibly existing migrations for the queue. | ||
* | ||
* Normally, if there are pending migrations, the Queue, Worker and QueueEvents instances | ||
* will throw an error when they are instantiated. Use then this method to run the migrations | ||
* before instantiating the instances. | ||
* | ||
* @param redisClient The Redis client instance | ||
* @param opts The options for the migration | ||
* | ||
* @sa https://docs.bullmq.io/guide/migrations | ||
*/ | ||
export const runMigrations = async ( | ||
redisClient: RedisClient, | ||
opts: { | ||
prefix?: string; | ||
queueName: string; | ||
}, | ||
) => { | ||
const prefix = opts.prefix || 'bull'; | ||
const migrationsKey = getRedisKeyFromOpts({ prefix, ...opts }, 'migrations'); | ||
|
||
// The migrations key is a ZSET with the migration timestamp as the score | ||
for (const migration of migrations) { | ||
const migrationId = `${migration.version}-${migration.name}`; | ||
const pendingMigration = !!(await redisClient.zscore( | ||
migrationsKey, | ||
migrationId, | ||
)); | ||
if (pendingMigration) { | ||
continue; | ||
} | ||
console.log(`[BULLMQ] Running migration ${migrationId}`); | ||
try { | ||
await migration.migrate(redisClient, { | ||
prefix, | ||
queueName: opts.queueName, | ||
}); | ||
await redisClient.zadd(migrationsKey, Date.now(), migrationId); | ||
} catch (err) { | ||
console.error(`[BULLMQ] Migration ${migrationId} failed: ${err}`); | ||
break; | ||
} | ||
console.log(`[BULLMQ] Migration ${migrationId} completed`); | ||
} | ||
}; | ||
|
||
function getRedisKeyFromOpts(opts: MigrationOptions, key: string): string { | ||
return `${opts.prefix}:${opts.queueName}:${key}`; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think default should be perform the migration? I think it makes more sense to have the reverse, default is not to do the migration, you must explicitly do it, In fact it would be better to give an error if the queue has not been migrated yet and require you to manually migrate first as part of your deployment steps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So basically we should have only one way to migrate which would be using the run migrations utility, and not implicitly when the queue is instantiated.