diff --git a/README.md b/README.md index cdb5a82..b1a2838 100644 --- a/README.md +++ b/README.md @@ -177,6 +177,10 @@ A: Adds a `logging.DEBUG` level handler to `delayed.logger.logger`. The simplest ## Release notes +* 1.1: + 1. Adds `log_level` param to `delayed.logger.setup_logger()`. + 2. Prevents different online workers have the same id. + * 1.0: 1. Python 2.7 is not supported anymore. (BREAKING CHANGE) 2. Supports Go, adds `GoTask`. diff --git a/delayed/__init__.py b/delayed/__init__.py index 321d147..f9e564f 100644 --- a/delayed/__init__.py +++ b/delayed/__init__.py @@ -1,3 +1,3 @@ # -*- coding: utf-8 -*- -__version__ = '1.0.1b2' +__version__ = '1.1.0b1' diff --git a/delayed/constants.py b/delayed/constants.py index 3c5e2f9..b538505 100644 --- a/delayed/constants.py +++ b/delayed/constants.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- -class Status(object): +class Status: ( STOPPED, RUNNING, diff --git a/delayed/delay.py b/delayed/delay.py index 7cdd4e9..17aee36 100644 --- a/delayed/delay.py +++ b/delayed/delay.py @@ -2,10 +2,11 @@ from functools import wraps +from .queue import Queue from .task import PyTask -def delayed(queue): +def delayed(queue: Queue): """A decorator for defining task functions. Calling a delayed function is equivalent to call the raw function. Calling the delay() method of a delayed function will enqueue a task. diff --git a/delayed/keep_alive.py b/delayed/keep_alive.py index 0874475..310a044 100644 --- a/delayed/keep_alive.py +++ b/delayed/keep_alive.py @@ -1,11 +1,15 @@ import threading +from typing import TYPE_CHECKING from .constants import Status from .logger import logger +if TYPE_CHECKING: + from .worker import Worker + class KeepAliveThread(threading.Thread): - def __init__(self, worker): + def __init__(self, worker: 'Worker'): super(KeepAliveThread, self).__init__() self._worker = worker @@ -20,4 +24,4 @@ def run(self): logger.exception('Failed to keep alive.') with worker._cond: worker._cond.wait(interval) - queue._die() + queue.go_offline() diff --git a/delayed/logger.py b/delayed/logger.py index 11fc11f..7b55c96 100644 --- a/delayed/logger.py +++ b/delayed/logger.py @@ -10,7 +10,7 @@ logger.addHandler(_null_handler) -def set_handler(handler): +def set_handler(handler: logging.Handler): """Set the handler of the logger. Args: @@ -19,31 +19,42 @@ def set_handler(handler): logger.handlers = [handler] -def setup_logger(date_format: str = DEFAULT_DATE_FORMAT, log_format: str = DEFAULT_LOG_FORMAT): +def setup_logger( + date_format: str = DEFAULT_DATE_FORMAT, + log_format: str = DEFAULT_LOG_FORMAT, + log_level: int = logging.DEBUG +): """Setup a console logger. Args: date_format (str): The date format of the logger. log_format (str): The log format of the logger. + log_level (int): The log level of the logger. """ logger.removeHandler(_null_handler) if logger.handlers: for handler in logger.handlers: - _setup_handler(handler, date_format, log_format) + _setup_handler(handler, date_format, log_format, log_level) else: handler = logging.StreamHandler() - _setup_handler(handler, date_format, log_format) + _setup_handler(handler, date_format, log_format, log_level) logger.addHandler(handler) -def _setup_handler(handler, date_format: str, log_format: str): +def _setup_handler( + handler: logging.Handler, + date_format: str, + log_format: str, + log_level: int = logging.DEBUG +): """Setup a handler for the logger. Args: handler (logging.Handler): The handler to be setup. date_format (str): The date format of the handler. log_format (str): The log format of the handler. + log_level (int): The log level of the handler. """ - handler.setLevel(logging.DEBUG) + handler.setLevel(log_level) formatter = logging.Formatter(fmt=log_format, datefmt=date_format) handler.setFormatter(formatter) diff --git a/delayed/queue.py b/delayed/queue.py index 6cf8aa7..d8a441b 100644 --- a/delayed/queue.py +++ b/delayed/queue.py @@ -1,11 +1,12 @@ # -*- coding: utf-8 -*- +from math import ceil from typing import Optional, Union import redis from .logger import logger -from .task import PyTask +from .task import GoTask, PyTask # KEYS: queue_name, processing_key @@ -55,8 +56,13 @@ class Queue: keep_alive_timeout (int or float): The keep alive timeout in seconds of the worker. """ - def __init__(self, name: str, conn: redis.Redis, dequeue_timeout: Union[int, float] = 1, keep_alive_timeout: Union[int, float] = 60): - self._worker_id: Optional[bytes] = None + def __init__( + self, name: str, + conn: redis.Redis, + dequeue_timeout: Union[int, float] = 1, + keep_alive_timeout: Union[int, float] = 60 + ): + self._worker_id: Optional[int] = None self._name = name self._noti_key = name + _NOTI_KEY_SUFFIX self._processing_key = name + _PROCESSING_KEY_SUFFIX @@ -66,7 +72,7 @@ def __init__(self, name: str, conn: redis.Redis, dequeue_timeout: Union[int, flo self._dequeue_script = conn.register_script(_DEQUEUE_SCRIPT) self._requeue_lost_script = conn.register_script(_REQUEUE_LOST_SCRIPT) - def enqueue(self, task: PyTask): + def enqueue(self, task: Union[GoTask, PyTask]): """Enqueues a task to the queue. Args: @@ -74,11 +80,14 @@ def enqueue(self, task: PyTask): """ logger.debug('Enqueuing task %s.', task._func_path) data = task.serialize() - with self._conn.pipeline() as pipe: - pipe.rpush(self._name, data) - pipe.rpush(self._noti_key, '1') - pipe.execute() - logger.debug('Enqueued task %s.', task._func_path) + if data: + with self._conn.pipeline() as pipe: + pipe.rpush(self._name, data) + pipe.rpush(self._noti_key, '1') + pipe.execute() + logger.debug('Enqueued task %s.', task._func_path) + else: + logger.error('Failed to serialize task %s.', task._func_path) def dequeue(self) -> Optional[PyTask]: """Dequeues a task from the queue. @@ -116,7 +125,7 @@ def requeue_lost(self) -> int: Returns: int: The requeued task count. """ - count = self._requeue_lost_script( + count: int = self._requeue_lost_script( keys=(self._name, self._noti_key, self._processing_key)) if count >= 1: if count == 1: @@ -130,7 +139,14 @@ def keep_alive(self): self._conn.setex(self._worker_id, self._keep_alive_timeout, 1) logger.debug('Worker %s is alive.', self._worker_id) - def _die(self): - """Set the worker of the queue dead.""" + def go_offline(self): + """Set the worker of the queue offline.""" self._conn.delete(self._worker_id) - logger.debug('Worker %s dies.', self._worker_id) + logger.debug('Worker %s is offline.', self._worker_id) + + def try_online(self) -> bool: + """Try to set the worker of the queue online.""" + if self._conn.set(self._worker_id, 1, ex=ceil(self._keep_alive_timeout), nx=True): + logger.debug('Worker %s is online.', self._worker_id) + return True + return False diff --git a/delayed/task.py b/delayed/task.py index d42e24b..777ee23 100644 --- a/delayed/task.py +++ b/delayed/task.py @@ -22,7 +22,12 @@ class PyTask: __slots__ = ['_func_path', '_args', '_kwargs', '_data'] - def __init__(self, func: Union[Callable, str], args: Union[list, tuple, None] = None, kwargs: Optional[dict] = None): + def __init__( + self, + func: Union[Callable, str], + args: Union[list, tuple, None] = None, + kwargs: Optional[dict] = None + ): if isinstance(func, str): self._func_path = func elif callable(func): @@ -91,7 +96,7 @@ class GoTask: __slots__ = ['_func_path', '_args', '_payload', '_data'] - def __init__(self, func_path: str, args: Optional[list] = None): + def __init__(self, func_path: str, args: Any = None): self._func_path = func_path self._args = args self._payload = None diff --git a/delayed/worker.py b/delayed/worker.py index e0fb92c..0d87bf9 100644 --- a/delayed/worker.py +++ b/delayed/worker.py @@ -1,12 +1,10 @@ # -*- coding: utf-8 -*- -import binascii -import os +from secrets import randbits import signal -import sys -import threading -import time -import sys +from sys import exc_info +from threading import Condition +from time import sleep from typing import Union from .constants import DEFAULT_SLEEP_TIME, MAX_SLEEP_TIME, Status @@ -21,18 +19,33 @@ class Worker: Args: queue (delayed.queue.Queue): The task queue of the worker. keep_alive_interval (int or float): The worker marks itself as alive for every `keep_alive_interval` seconds. + id_bits (int): The bits used for the worker ID. The default value is 16, which is enough for 2 ** 16 workers. """ - def __init__(self, queue: Queue, keep_alive_interval: Union[int, float] = 15): - queue._worker_id = self._id = binascii.hexlify(os.urandom(16)) + def __init__( + self, + queue: Queue, + keep_alive_interval: Union[int, float] = 15, + id_bits: int = 16 + ): self._queue = queue self._keep_alive_interval = keep_alive_interval + self._id_bits = id_bits self._status = Status.STOPPED - self._cond = threading.Condition() + self._cond = Condition() + + def generate_id(self): + """Generates a random ID for the worker.""" + while True: + self._queue._worker_id = self._id = randbits(self._id_bits) + if self._queue.try_online(): + return def run(self): # pragma: no cover """Runs the worker.""" - logger.debug('Starting worker %s.', self._id) + self.generate_id() + + logger.debug('Starting worker %d.', self._id) self._status = Status.RUNNING self._register_signals() @@ -46,7 +59,7 @@ def run(self): # pragma: no cover task = self._queue.dequeue() except Exception: # pragma: no cover logger.exception('Failed to dequeue task.') - time.sleep(sleep_time) + sleep(sleep_time) sleep_time *= 2 if sleep_time > MAX_SLEEP_TIME: sleep_time = MAX_SLEEP_TIME @@ -59,7 +72,7 @@ def run(self): # pragma: no cover logger.exception('Failed to execute task %s.', task._func_path) need_retry = False - _, _, exc_traceback = sys.exc_info() + _, _, exc_traceback = exc_info() if exc_traceback: tb_next = exc_traceback.tb_next if tb_next: @@ -85,12 +98,12 @@ def run(self): # pragma: no cover with self._cond: self._cond.notify() thread.join() - logger.debug('Stopped worker %s.', self._id) + logger.debug('Stopped worker %d.', self._id) def stop(self): """Stops the worker.""" if self._status == Status.RUNNING: - logger.debug('Stopping worker %s.', self._id) + logger.debug('Stopping worker %d.', self._id) self._status = Status.STOPPING def _requeue_task(self, task: PyTask): @@ -110,7 +123,7 @@ def _release_task(self): try: self._queue.release() except Exception: # pragma: no cover - logger.exception('Failed to release task of worker %s.', self._id) + logger.exception('Failed to release task of worker %d.', self._id) def _register_signals(self): """Registers signal handlers.""" diff --git a/tests/common.py b/tests/common.py index e4c8b83..64259fa 100644 --- a/tests/common.py +++ b/tests/common.py @@ -16,6 +16,8 @@ WORKER = Worker(QUEUE) DELAYED = delayed(QUEUE) +WORKER.generate_id() + def func(a, b): return a + b diff --git a/tests/test_delay.py b/tests/test_delay.py index 1709916..679d65e 100644 --- a/tests/test_delay.py +++ b/tests/test_delay.py @@ -17,5 +17,6 @@ def test_delayed(): delayed_func.delay(1, 2) assert QUEUE.len() == 1 task = QUEUE.dequeue() + assert task is not None assert task.execute() == 3 QUEUE.release() diff --git a/tests/test_queue.py b/tests/test_queue.py index 8578154..b6216cb 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,11 +1,12 @@ # -*- coding: utf-8 -*- +from delayed.queue import Queue from delayed.task import GoTask, PyTask from .common import CONN, func, NOTI_KEY, PROCESSING_KEY, QUEUE, QUEUE_NAME -class TestQueue(object): +class TestQueue: def test_enqueue(self): CONN.delete(QUEUE_NAME, NOTI_KEY) @@ -39,6 +40,7 @@ def test_dequeue(self): QUEUE.enqueue(task3) task = QUEUE.dequeue() + assert task is not None assert CONN.llen(QUEUE_NAME) == 2 assert CONN.llen(NOTI_KEY) == 2 assert CONN.hget(PROCESSING_KEY, QUEUE._worker_id) == task._data @@ -47,6 +49,7 @@ def test_dequeue(self): assert task._kwargs == {} task = QUEUE.dequeue() + assert task is not None assert CONN.llen(QUEUE_NAME) == 1 assert CONN.llen(NOTI_KEY) == 1 assert CONN.hget(PROCESSING_KEY, QUEUE._worker_id) == task._data @@ -55,6 +58,7 @@ def test_dequeue(self): assert task._kwargs == {'b': 4} task = QUEUE.dequeue() + assert task is not None assert CONN.llen(QUEUE_NAME) == 0 assert CONN.llen(NOTI_KEY) == 0 assert CONN.hget(PROCESSING_KEY, QUEUE._worker_id) == task._data @@ -100,13 +104,33 @@ def test_requeue_lost(self): assert QUEUE.requeue_lost() == 0 QUEUE.dequeue() + # WORKER.generate_id() set the queue to online for at least 1 second. + # It prevents the queue to go offline in the middle of the test. + QUEUE.go_offline() assert QUEUE.requeue_lost() == 1 assert QUEUE.len() == 1 QUEUE.keep_alive() QUEUE.dequeue() assert QUEUE.requeue_lost() == 0 - QUEUE._die() + QUEUE.go_offline() assert QUEUE.requeue_lost() == 1 CONN.delete(QUEUE_NAME, NOTI_KEY, PROCESSING_KEY) + + def test_try_online(self): + QUEUE.go_offline() + assert QUEUE.try_online() + assert not QUEUE.try_online() + + QUEUE.go_offline() + assert QUEUE.try_online() + + queue = Queue(QUEUE_NAME, CONN, 0.01) + queue._worker_id = QUEUE._worker_id + assert not queue.try_online() + + QUEUE.go_offline() + assert queue.try_online() + + queue.go_offline() diff --git a/tests/test_sweeper.py b/tests/test_sweeper.py index 84c268b..7225fed 100644 --- a/tests/test_sweeper.py +++ b/tests/test_sweeper.py @@ -10,14 +10,14 @@ from .common import CONN, func, QUEUE, QUEUE_NAME, NOTI_KEY -class TestSweeper(object): +class TestSweeper: def test_run(self): queue_name = 'test' noti_key = queue_name + _NOTI_KEY_SUFFIX CONN.delete(QUEUE_NAME, NOTI_KEY, queue_name, noti_key) queue = Queue('test', CONN, 0.01) - queue._worker_id = 'test' + queue._worker_id = 123 sweeper = Sweeper([QUEUE, queue], 0.05) task = PyTask(func, (1, 2)) diff --git a/tests/test_task.py b/tests/test_task.py index 44222de..8b5969e 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -5,7 +5,7 @@ from .common import func -class TestPyTask(object): +class TestPyTask: def test_create(self): task = PyTask(func, (1, 2)) assert task._func_path == 'tests.common:func' @@ -44,7 +44,7 @@ def test_execute(self): assert task.execute() == 3 -class TestGoTask(object): +class TestGoTask: def test_create(self): task = GoTask('test.Func', (1, 2)) assert task._func_path == 'test.Func' diff --git a/tests/test_worker.py b/tests/test_worker.py index aba8f6b..98febc4 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -4,15 +4,36 @@ import signal from delayed.task import PyTask +from delayed.queue import Queue +from delayed.worker import Worker from .common import CONN, NOTI_KEY, PROCESSING_KEY, QUEUE, QUEUE_NAME, WORKER -def stop(pid): +def stop(pid: int): os.kill(pid, signal.SIGHUP) -class TestWorker(object): +class TestWorker: + def test_generate_id(self): + QUEUE.go_offline() # make sure no worker ids are used + + queue1 = Queue(QUEUE_NAME, conn=CONN) + worker1 = Worker(queue1, id_bits=1) + worker1.generate_id() + assert worker1._id is not None + assert 0 <= worker1._id < 2 + + queue2 = Queue(QUEUE_NAME, conn=CONN) + worker2 = Worker(queue2, id_bits=1) + worker2.generate_id() + assert worker2._id is not None + assert 0 <= worker2._id < 2 + assert worker1._id != worker2._id + + queue1.go_offline() + queue2.go_offline() + def test_run(self): CONN.delete(QUEUE_NAME, NOTI_KEY, PROCESSING_KEY)