diff --git a/python/bullmq/queue.py b/python/bullmq/queue.py index 44a54355ed..5489db08f9 100644 --- a/python/bullmq/queue.py +++ b/python/bullmq/queue.py @@ -1,4 +1,5 @@ import asyncio +from bullmq.event_emitter import EventEmitter from bullmq.redis_connection import RedisConnection from bullmq.types import QueueBaseOptions, RetryJobsOptions, JobOptions, PromoteJobsOptions from bullmq.utils import extract_result @@ -6,7 +7,7 @@ from bullmq.job import Job -class Queue: +class Queue(EventEmitter): """ Instantiate a Queue object """ @@ -265,7 +266,7 @@ async def getJobs(self, types, start=0, end=-1, asc:bool=False): job_ids = await self.scripts.getRanges(current_types, start, end, asc) tasks = [asyncio.create_task(Job.fromId(self, i)) for i in job_ids] job_set, _ = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) - jobs = [extract_result(job_task) for job_task in job_set] + jobs = [extract_result(job_task, self.emit) for job_task in job_set] jobs_len = len(jobs) # we filter `None` out to remove: diff --git a/python/bullmq/timer.py b/python/bullmq/timer.py index 30a7d27728..de981ded0d 100644 --- a/python/bullmq/timer.py +++ b/python/bullmq/timer.py @@ -4,11 +4,12 @@ class Timer: - def __init__(self, interval: int, callback, *args, **kwargs): + def __init__(self, interval: int, callback, emit_callback, *args, **kwargs): self.interval = interval self.args = args self.kwargs = kwargs self.callback = callback + self.emit = emit_callback self._ok = True self._task = asyncio.ensure_future(self._job()) @@ -17,8 +18,9 @@ async def _job(self): while self._ok: await asyncio.sleep(self.interval) await self.callback(*self.args, **self.kwargs) - except Exception as ex: - print(ex) + except Exception as err: + self.emit("error", err) + pass def stop(self): self._ok = False diff --git a/python/bullmq/utils.py b/python/bullmq/utils.py index 852dd1c2fc..70b00a288b 100644 --- a/python/bullmq/utils.py +++ b/python/bullmq/utils.py @@ -4,15 +4,15 @@ def isRedisVersionLowerThan(current_version, minimum_version): return semver.VersionInfo.parse(current_version).compare(minimum_version) == -1 -def extract_result(job_task): +def extract_result(job_task, emit_callback): try: return job_task.result() except Exception as e: if not str(e).startswith('Connection closed by server'): # lets use a simple-but-effective error handling: - # print error message and ignore the job - print("ERROR:", e) + # ignore the job traceback.print_exc() + emit_callback("error", e) def get_parent_key(opts: dict): if opts: diff --git a/python/bullmq/worker.py b/python/bullmq/worker.py index 803196e5d1..a0e33404cc 100644 --- a/python/bullmq/worker.py +++ b/python/bullmq/worker.py @@ -61,9 +61,9 @@ async def run(self): raise Exception("Worker is already running") self.timer = Timer( - (self.opts.get("lockDuration") / 2) / 1000, self.extendLocks) + (self.opts.get("lockDuration") / 2) / 1000, self.extendLocks, self.emit) self.stalledCheckTimer = Timer(self.opts.get( - "stalledInterval") / 1000, self.runStalledJobsCheck) + "stalledInterval") / 1000, self.runStalledJobsCheck, self.emit) self.running = True jobs = [] @@ -77,7 +77,7 @@ async def run(self): self.processing.add(waiting_job) try: - jobs, pending = await getCompleted(self.processing) + jobs, pending = await getCompleted(self.processing, self.emit) jobs_to_process = [self.processJob(job, job.token) for job in jobs] processing_jobs = [asyncio.ensure_future( @@ -91,7 +91,6 @@ async def run(self): except Exception as e: # This should never happen or we will have an endless loop - print("ERROR:", e) traceback.print_exc() return @@ -200,13 +199,11 @@ async def processJob(self, job: Job, token: str): return except Exception as err: try: - print("Error processing job", err) if not self.forceClosing: await job.moveToFailed(err, token) self.emit("failed", job, err) except Exception as err: - print("Error moving job to failed", err) self.emit("error", err, job) finally: self.jobs.remove((job, token)) @@ -225,7 +222,6 @@ async def extendLocks(self): # self.emit("error", "could not renew lock for job " + jobId) except Exception as e: - print("Error renewing locks", e) traceback.print_exc() async def runStalledJobsCheck(self): @@ -238,7 +234,6 @@ async def runStalledJobsCheck(self): self.emit("stalled", jobId) except Exception as e: - print("Error checking stalled jobs", e) self.emit('error', e) async def close(self, force: bool = False): @@ -264,9 +259,9 @@ def cancelProcessing(self): job.cancel() -async def getCompleted(task_set: set) -> tuple[list[Job], set]: +async def getCompleted(task_set: set, emit_callback) -> tuple[list[Job], set]: job_set, pending = await asyncio.wait(task_set, return_when=asyncio.FIRST_COMPLETED) - jobs = [extract_result(job_task) for job_task in job_set] + jobs = [extract_result(job_task, emit_callback) for job_task in job_set] # we filter `None` out to remove: # a) an empty 'completed jobs' list; and # b) a failed extract_result