Skip to content

Commit

Permalink
feat(job): add promote method [python] (#2323)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Dec 12, 2023
1 parent f3536dd commit 61f4ba3
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 7 deletions.
19 changes: 18 additions & 1 deletion python/bullmq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ def updateData(self, data):
self.data = data
return self.scripts.updateData(self.id, data)

async def promote(self):
await self.scripts.promote(self.id)
self.delay = 0

def retry(self, state: str = "failed"):
self.failedReason = None
self.finishedOn = None
Expand All @@ -81,10 +85,23 @@ def updateProgress(self, progress):

async def remove(self, opts: dict = {}):
removed = await self.scripts.remove(self.id, opts.get("removeChildren", True))

if not removed:
raise Exception(f"Job {self.id} could not be removed because it is locked by another worker")

def isDelayed(self):
return self.isInZSet('delayed')

async def isInZSet(self, set: str):
score = await self.queue.client.zscore(self.scripts.toKey(set), self.id)

return score is not None

async def isInZSet(self, set: str):
score = await self.queue.client.zscore(self.scripts.toKey(set), self.id)

return score is not None

async def moveToFailed(self, err, token:str, fetchNext:bool = False):
error_message = str(err)
self.failedReason = error_message
Expand Down
22 changes: 22 additions & 0 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-4.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-5.lua")),
"promote": self.redisClient.register_script(self.getScript("promote-7.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-6.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-9.lua")),
Expand Down Expand Up @@ -282,6 +283,27 @@ async def moveToDelayed(self, job_id: str, timestamp: int, delay: int, token: st
raise self.finishedErrors(result, job_id, 'moveToDelayed', 'active')
return None

def promoteArgs(self, job_id: str):
keys = self.getKeys(['delayed', 'wait', 'paused', 'meta', 'prioritized', 'pc', 'events'])
keys.append(self.toKey(job_id))
keys.append(self.keys['events'])
keys.append(self.keys['paused'])
keys.append(self.keys['meta'])

args = [self.keys[''], job_id]

return (keys, args)

async def promote(self, job_id: str):
keys, args = self.promoteArgs(job_id)

result = await self.commands["promote"](keys=keys, args=args)

if result is not None:
if result < 0:
raise self.finishedErrors(result, job_id, 'promote', 'delayed')
return None

def remove(self, job_id: str, remove_children: bool):
keys = self.getKeys([''])
args = [job_id, 1 if remove_children else 0]
Expand Down
12 changes: 12 additions & 0 deletions python/tests/job_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,17 @@ async def test_update_job_data_when_is_removed(self):

await queue.close()

async def test_promote_delayed_job(self):
queue = Queue(queueName)
job = await queue.add("test", {"foo": "bar"}, {"delay": 1500})
isDelayed = await job.isDelayed()
self.assertEqual(isDelayed, True)
await job.promote()
self.assertEqual(job.delay, 0)
isDelayedAfterPromote = await job.isDelayed()
self.assertEqual(isDelayedAfterPromote, False)

await queue.close()

if __name__ == '__main__':
unittest.main()
7 changes: 3 additions & 4 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1055,10 +1055,9 @@ export class Job<
async promote(): Promise<void> {
const jobId = this.id;

const code = await this.scripts.promote(jobId);
if (code < 0) {
throw this.scripts.finishedErrors(code, this.id, 'promote', 'delayed');
}
await this.scripts.promote(jobId);

this.delay = 0;
}

/**
Expand Down
7 changes: 5 additions & 2 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ export class Scripts {
return raw2NextJobData(result);
}

async promote(jobId: string): Promise<number> {
async promote(jobId: string): Promise<void> {
const client = await this.queue.client;

const keys = [
Expand All @@ -959,7 +959,10 @@ export class Scripts {

const args = [this.queue.toKey(''), jobId];

return (<any>client).promote(keys.concat(args));
const code = await (<any>client).promote(keys.concat(args));
if (code < 0) {
throw this.finishedErrors(code, jobId, 'promote', 'delayed');
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions tests/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,7 @@ describe('Job', function () {
const isDelayed = await job.isDelayed();
expect(isDelayed).to.be.equal(true);
await job.promote();
expect(job.delay).to.be.equal(0);

const isDelayedAfterPromote = await job.isDelayed();
expect(isDelayedAfterPromote).to.be.equal(false);
Expand Down

0 comments on commit 61f4ba3

Please sign in to comment.