Skip to content

Commit

Permalink
1. Adds log_level param to delayed.logger.setup_logger().
Browse files Browse the repository at this point in the history
2. Prevents different online workers have the same id.
  • Loading branch information
keakon committed Mar 29, 2024
1 parent 4a3b541 commit 58a5b9d
Show file tree
Hide file tree
Showing 15 changed files with 151 additions and 49 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ A: Adds a `logging.DEBUG` level handler to `delayed.logger.logger`. The simplest
## Release notes
* 1.1:
1. Adds `log_level` param to `delayed.logger.setup_logger()`.
2. Prevents different online workers have the same id.
* 1.0:
1. Python 2.7 is not supported anymore. (BREAKING CHANGE)
2. Supports Go, adds `GoTask`.
Expand Down
2 changes: 1 addition & 1 deletion delayed/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# -*- coding: utf-8 -*-

__version__ = '1.0.1b2'
__version__ = '1.1.0b1'
2 changes: 1 addition & 1 deletion delayed/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-


class Status(object):
class Status:
(
STOPPED,
RUNNING,
Expand Down
3 changes: 2 additions & 1 deletion delayed/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

from functools import wraps

from .queue import Queue
from .task import PyTask


def delayed(queue):
def delayed(queue: Queue):
"""A decorator for defining task functions.
Calling a delayed function is equivalent to call the raw function.
Calling the delay() method of a delayed function will enqueue a task.
Expand Down
8 changes: 6 additions & 2 deletions delayed/keep_alive.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import threading
from typing import TYPE_CHECKING

from .constants import Status
from .logger import logger

if TYPE_CHECKING:
from .worker import Worker


class KeepAliveThread(threading.Thread):
def __init__(self, worker):
def __init__(self, worker: 'Worker'):
super(KeepAliveThread, self).__init__()
self._worker = worker

Expand All @@ -20,4 +24,4 @@ def run(self):
logger.exception('Failed to keep alive.')
with worker._cond:
worker._cond.wait(interval)
queue._die()
queue.go_offline()
23 changes: 17 additions & 6 deletions delayed/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
logger.addHandler(_null_handler)


def set_handler(handler):
def set_handler(handler: logging.Handler):
"""Set the handler of the logger.
Args:
Expand All @@ -19,31 +19,42 @@ def set_handler(handler):
logger.handlers = [handler]


def setup_logger(date_format: str = DEFAULT_DATE_FORMAT, log_format: str = DEFAULT_LOG_FORMAT):
def setup_logger(
date_format: str = DEFAULT_DATE_FORMAT,
log_format: str = DEFAULT_LOG_FORMAT,
log_level: int = logging.DEBUG
):
"""Setup a console logger.
Args:
date_format (str): The date format of the logger.
log_format (str): The log format of the logger.
log_level (int): The log level of the logger.
"""
logger.removeHandler(_null_handler)
if logger.handlers:
for handler in logger.handlers:
_setup_handler(handler, date_format, log_format)
_setup_handler(handler, date_format, log_format, log_level)
else:
handler = logging.StreamHandler()
_setup_handler(handler, date_format, log_format)
_setup_handler(handler, date_format, log_format, log_level)
logger.addHandler(handler)


def _setup_handler(handler, date_format: str, log_format: str):
def _setup_handler(
handler: logging.Handler,
date_format: str,
log_format: str,
log_level: int = logging.DEBUG
):
"""Setup a handler for the logger.
Args:
handler (logging.Handler): The handler to be setup.
date_format (str): The date format of the handler.
log_format (str): The log format of the handler.
log_level (int): The log level of the handler.
"""
handler.setLevel(logging.DEBUG)
handler.setLevel(log_level)
formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
handler.setFormatter(formatter)
42 changes: 29 additions & 13 deletions delayed/queue.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# -*- coding: utf-8 -*-

from math import ceil
from typing import Optional, Union

import redis

from .logger import logger
from .task import PyTask
from .task import GoTask, PyTask


# KEYS: queue_name, processing_key
Expand Down Expand Up @@ -55,8 +56,13 @@ class Queue:
keep_alive_timeout (int or float): The keep alive timeout in seconds of the worker.
"""

def __init__(self, name: str, conn: redis.Redis, dequeue_timeout: Union[int, float] = 1, keep_alive_timeout: Union[int, float] = 60):
self._worker_id: Optional[bytes] = None
def __init__(
self, name: str,
conn: redis.Redis,
dequeue_timeout: Union[int, float] = 1,
keep_alive_timeout: Union[int, float] = 60
):
self._worker_id: Optional[int] = None
self._name = name
self._noti_key = name + _NOTI_KEY_SUFFIX
self._processing_key = name + _PROCESSING_KEY_SUFFIX
Expand All @@ -66,19 +72,22 @@ def __init__(self, name: str, conn: redis.Redis, dequeue_timeout: Union[int, flo
self._dequeue_script = conn.register_script(_DEQUEUE_SCRIPT)
self._requeue_lost_script = conn.register_script(_REQUEUE_LOST_SCRIPT)

def enqueue(self, task: PyTask):
def enqueue(self, task: Union[GoTask, PyTask]):
"""Enqueues a task to the queue.
Args:
task (delayed.task.PyTask or delayed.task.GoTask): The task to be enqueued.
"""
logger.debug('Enqueuing task %s.', task._func_path)
data = task.serialize()
with self._conn.pipeline() as pipe:
pipe.rpush(self._name, data)
pipe.rpush(self._noti_key, '1')
pipe.execute()
logger.debug('Enqueued task %s.', task._func_path)
if data:
with self._conn.pipeline() as pipe:
pipe.rpush(self._name, data)
pipe.rpush(self._noti_key, '1')
pipe.execute()
logger.debug('Enqueued task %s.', task._func_path)
else:
logger.error('Failed to serialize task %s.', task._func_path)

def dequeue(self) -> Optional[PyTask]:
"""Dequeues a task from the queue.
Expand Down Expand Up @@ -116,7 +125,7 @@ def requeue_lost(self) -> int:
Returns:
int: The requeued task count.
"""
count = self._requeue_lost_script(
count: int = self._requeue_lost_script(
keys=(self._name, self._noti_key, self._processing_key))
if count >= 1:
if count == 1:
Expand All @@ -130,7 +139,14 @@ def keep_alive(self):
self._conn.setex(self._worker_id, self._keep_alive_timeout, 1)
logger.debug('Worker %s is alive.', self._worker_id)

def _die(self):
"""Set the worker of the queue dead."""
def go_offline(self):
"""Set the worker of the queue offline."""
self._conn.delete(self._worker_id)
logger.debug('Worker %s dies.', self._worker_id)
logger.debug('Worker %s is offline.', self._worker_id)

def try_online(self) -> bool:
"""Try to set the worker of the queue online."""
if self._conn.set(self._worker_id, 1, ex=ceil(self._keep_alive_timeout), nx=True):
logger.debug('Worker %s is online.', self._worker_id)
return True
return False
9 changes: 7 additions & 2 deletions delayed/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ class PyTask:

__slots__ = ['_func_path', '_args', '_kwargs', '_data']

def __init__(self, func: Union[Callable, str], args: Union[list, tuple, None] = None, kwargs: Optional[dict] = None):
def __init__(
self,
func: Union[Callable, str],
args: Union[list, tuple, None] = None,
kwargs: Optional[dict] = None
):
if isinstance(func, str):
self._func_path = func
elif callable(func):
Expand Down Expand Up @@ -91,7 +96,7 @@ class GoTask:

__slots__ = ['_func_path', '_args', '_payload', '_data']

def __init__(self, func_path: str, args: Optional[list] = None):
def __init__(self, func_path: str, args: Any = None):
self._func_path = func_path
self._args = args
self._payload = None
Expand Down
43 changes: 28 additions & 15 deletions delayed/worker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
# -*- coding: utf-8 -*-

import binascii
import os
from secrets import randbits
import signal
import sys
import threading
import time
import sys
from sys import exc_info
from threading import Condition
from time import sleep
from typing import Union

from .constants import DEFAULT_SLEEP_TIME, MAX_SLEEP_TIME, Status
Expand All @@ -21,18 +19,33 @@ class Worker:
Args:
queue (delayed.queue.Queue): The task queue of the worker.
keep_alive_interval (int or float): The worker marks itself as alive for every `keep_alive_interval` seconds.
id_bits (int): The bits used for the worker ID. The default value is 16, which is enough for 2 ** 16 workers.
"""

def __init__(self, queue: Queue, keep_alive_interval: Union[int, float] = 15):
queue._worker_id = self._id = binascii.hexlify(os.urandom(16))
def __init__(
self,
queue: Queue,
keep_alive_interval: Union[int, float] = 15,
id_bits: int = 16
):
self._queue = queue
self._keep_alive_interval = keep_alive_interval
self._id_bits = id_bits
self._status = Status.STOPPED
self._cond = threading.Condition()
self._cond = Condition()

def generate_id(self):
"""Generates a random ID for the worker."""
while True:
self._queue._worker_id = self._id = randbits(self._id_bits)
if self._queue.try_online():
return

def run(self): # pragma: no cover
"""Runs the worker."""
logger.debug('Starting worker %s.', self._id)
self.generate_id()

logger.debug('Starting worker %d.', self._id)
self._status = Status.RUNNING
self._register_signals()

Expand All @@ -46,7 +59,7 @@ def run(self): # pragma: no cover
task = self._queue.dequeue()
except Exception: # pragma: no cover
logger.exception('Failed to dequeue task.')
time.sleep(sleep_time)
sleep(sleep_time)
sleep_time *= 2
if sleep_time > MAX_SLEEP_TIME:
sleep_time = MAX_SLEEP_TIME
Expand All @@ -59,7 +72,7 @@ def run(self): # pragma: no cover
logger.exception('Failed to execute task %s.', task._func_path)

need_retry = False
_, _, exc_traceback = sys.exc_info()
_, _, exc_traceback = exc_info()
if exc_traceback:
tb_next = exc_traceback.tb_next
if tb_next:
Expand All @@ -85,12 +98,12 @@ def run(self): # pragma: no cover
with self._cond:
self._cond.notify()
thread.join()
logger.debug('Stopped worker %s.', self._id)
logger.debug('Stopped worker %d.', self._id)

def stop(self):
"""Stops the worker."""
if self._status == Status.RUNNING:
logger.debug('Stopping worker %s.', self._id)
logger.debug('Stopping worker %d.', self._id)
self._status = Status.STOPPING

def _requeue_task(self, task: PyTask):
Expand All @@ -110,7 +123,7 @@ def _release_task(self):
try:
self._queue.release()
except Exception: # pragma: no cover
logger.exception('Failed to release task of worker %s.', self._id)
logger.exception('Failed to release task of worker %d.', self._id)

def _register_signals(self):
"""Registers signal handlers."""
Expand Down
2 changes: 2 additions & 0 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
WORKER = Worker(QUEUE)
DELAYED = delayed(QUEUE)

WORKER.generate_id()


def func(a, b):
return a + b
1 change: 1 addition & 0 deletions tests/test_delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ def test_delayed():
delayed_func.delay(1, 2)
assert QUEUE.len() == 1
task = QUEUE.dequeue()
assert task is not None
assert task.execute() == 3
QUEUE.release()
Loading

0 comments on commit 58a5b9d

Please sign in to comment.