diff --git a/python/bullmq/job.py b/python/bullmq/job.py index 6bd67ac0fb..ed433afa86 100644 --- a/python/bullmq/job.py +++ b/python/bullmq/job.py @@ -62,6 +62,10 @@ def updateData(self, data): self.data = data return self.scripts.updateData(self.id, data) + async def promote(self): + await self.scripts.promote(self.id) + self.delay = 0 + def retry(self, state: str = "failed"): self.failedReason = None self.finishedOn = None @@ -81,10 +85,23 @@ def updateProgress(self, progress): async def remove(self, opts: dict = {}): removed = await self.scripts.remove(self.id, opts.get("removeChildren", True)) - + if not removed: raise Exception(f"Job {self.id} could not be removed because it is locked by another worker") + def isDelayed(self): + return self.isInZSet('delayed') + + async def isInZSet(self, set: str): + score = await self.queue.client.zscore(self.scripts.toKey(set), self.id) + + return score is not None + + async def isInZSet(self, set: str): + score = await self.queue.client.zscore(self.scripts.toKey(set), self.id) + + return score is not None + async def moveToFailed(self, err, token:str, fetchNext:bool = False): error_message = str(err) self.failedReason = error_message diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index 7fcf6ec93f..68e86ac597 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -49,6 +49,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection "moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-4.lua")), "obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")), "pause": self.redisClient.register_script(self.getScript("pause-5.lua")), + "promote": self.redisClient.register_script(self.getScript("promote-7.lua")), "removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")), "reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-6.lua")), "retryJob": self.redisClient.register_script(self.getScript("retryJob-9.lua")), @@ -282,6 +283,27 @@ async def moveToDelayed(self, job_id: str, timestamp: int, delay: int, token: st raise self.finishedErrors(result, job_id, 'moveToDelayed', 'active') return None + def promoteArgs(self, job_id: str): + keys = self.getKeys(['delayed', 'wait', 'paused', 'meta', 'prioritized', 'pc', 'events']) + keys.append(self.toKey(job_id)) + keys.append(self.keys['events']) + keys.append(self.keys['paused']) + keys.append(self.keys['meta']) + + args = [self.keys[''], job_id] + + return (keys, args) + + async def promote(self, job_id: str): + keys, args = self.promoteArgs(job_id) + + result = await self.commands["promote"](keys=keys, args=args) + + if result is not None: + if result < 0: + raise self.finishedErrors(result, job_id, 'promote', 'delayed') + return None + def remove(self, job_id: str, remove_children: bool): keys = self.getKeys(['']) args = [job_id, 1 if remove_children else 0] diff --git a/python/tests/job_tests.py b/python/tests/job_tests.py index 5b04e49e8a..bd620280ec 100644 --- a/python/tests/job_tests.py +++ b/python/tests/job_tests.py @@ -67,5 +67,17 @@ async def test_update_job_data_when_is_removed(self): await queue.close() + async def test_promote_delayed_job(self): + queue = Queue(queueName) + job = await queue.add("test", {"foo": "bar"}, {"delay": 1500}) + isDelayed = await job.isDelayed() + self.assertEqual(isDelayed, True) + await job.promote() + self.assertEqual(job.delay, 0) + isDelayedAfterPromote = await job.isDelayed() + self.assertEqual(isDelayedAfterPromote, False) + + await queue.close() + if __name__ == '__main__': unittest.main() diff --git a/src/classes/job.ts b/src/classes/job.ts index dfd6207039..bd89c33048 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -1055,10 +1055,9 @@ export class Job< async promote(): Promise { const jobId = this.id; - const code = await this.scripts.promote(jobId); - if (code < 0) { - throw this.scripts.finishedErrors(code, this.id, 'promote', 'delayed'); - } + await this.scripts.promote(jobId); + + this.delay = 0; } /** diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 2e26d604b5..e8d53cb618 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -944,7 +944,7 @@ export class Scripts { return raw2NextJobData(result); } - async promote(jobId: string): Promise { + async promote(jobId: string): Promise { const client = await this.queue.client; const keys = [ @@ -959,7 +959,10 @@ export class Scripts { const args = [this.queue.toKey(''), jobId]; - return (client).promote(keys.concat(args)); + const code = await (client).promote(keys.concat(args)); + if (code < 0) { + throw this.finishedErrors(code, jobId, 'promote', 'delayed'); + } } /** diff --git a/tests/test_job.ts b/tests/test_job.ts index 232de72f21..13e46f878d 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -1076,6 +1076,7 @@ describe('Job', function () { const isDelayed = await job.isDelayed(); expect(isDelayed).to.be.equal(true); await job.promote(); + expect(job.delay).to.be.equal(0); const isDelayedAfterPromote = await job.isDelayed(); expect(isDelayedAfterPromote).to.be.equal(false);