Skip to content

Commit

Permalink
Merge pull request #8 from RHDZMOTA/DEV
Browse files Browse the repository at this point in the history
2022-09-06 RELEASE-PRD (1)
  • Loading branch information
RHDZMOTA authored Sep 6, 2022
2 parents 8ee0da6 + 58b2dc6 commit 8a2ff23
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 6 deletions.
2 changes: 2 additions & 0 deletions requirements-extras.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ipython==8.4.0
celery[redis]==5.1.2
2 changes: 1 addition & 1 deletion src/rhdzmota/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.2.0
2.0.0
12 changes: 12 additions & 0 deletions src/rhdzmota/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from celery import Celery

from .settings import (
RHDZMOTA_CELERY_BROKER_HOST
)


# Celery application instance
app = Celery(
"tasks",
broker=f"redis://{RHDZMOTA_CELERY_BROKER_HOST}",
)
18 changes: 18 additions & 0 deletions src/rhdzmota/celery_worker_hello.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

from .celery import app
from .heartbeat.daemon import Daemon


@app.task
def daemon_publisher(**kwargs):
print(kwargs)


# Daemon instance
daemon = Daemon(name="hello-world", publisher=daemon_publisher)
daemon.broadcast()


@app.task
def worker(name: str) -> str:
return f"Hello, {name or 'world'}!"
18 changes: 14 additions & 4 deletions src/rhdzmota/cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from typing import Optional

from .settings import logger_manager
Expand All @@ -10,11 +11,20 @@

class CLI(CLIBase):

@CLIBase.Formatter()
def hello(self, world: Optional[str] = None):
logger.debug("CLI Hello command execution.")
@CLIBase.Formatter(default=str)
def hello(self, world: Optional[str] = None, sleep: int = 1, delegate: bool = False):
world = world or "world"
return f"Hello, {world}!"
time.sleep(sleep)
try:
from .celery_worker_hello import worker

if delegate:
return worker.delay(name=world)

logger.debug("CLI Hello command execution.")
return worker(name=world)
except ImportError:
return f"Hello, {world}!"

@CLIBase.Formatter()
def gist(
Expand Down
Empty file.
73 changes: 73 additions & 0 deletions src/rhdzmota/heartbeat/daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from threading import Thread
from typing import Any, Callable, Optional, Union, TYPE_CHECKING

from .daemon_heart import DaemonHeart


if TYPE_CHECKING:
# TODO: Quickfix guided by https://stackoverflow.com/questions/63714223/correct-type-annotation-for-a-celery-task
from celery.task import Task
from celery.local import PromiseProxy

CELERY_TASK_TYPE = Union[Task, PromiseProxy]
else:
CELERY_TASK_TYPE = Any


class Daemon:

@staticmethod
def summon(name: str, publisher: CELERY_TASK_TYPE, **heart_configs):
def decorator(worker: CELERY_TASK_TYPE):
def wrapper(*args, **kwargs):
# Initialize the daemon and broadcast heartbeat
daemon = Daemon(name=name, publisher=publisher, **heart_configs)
daemon.broadcast()
# Execute the worker function
output = worker.delay(*args, **kwargs)
# Kill the daemon
daemon.kill()
return output
return wrapper
return decorator

def __init__(
self,
name: str,
publisher: CELERY_TASK_TYPE,
**heart_configs
):
self.name = name
self.publisher = publisher
self.broadcast_thread: Optional[Thread] = None
self.heart = DaemonHeart.beat(
app_name=name,
**{ # type: ignore
"mode": DaemonHeart.Mode.MONITOR,
"enable_beat_logs": True,
"enable_pulse_monitor": True,
"pulse_monitor_frequency": None,
"pulse_monitor_sensibility_factor": 1.5,
**heart_configs
}
)

def kill(self):
# Stop heart
self.heart.stroke()
self.heart.join()
self.heart.close()
# Stop broadcast
self.broadcast_thread.join() if self.broadcast_thread is not None else None

def broadcast(self):
self.broadcast_thread: Thread = Thread(
target=lambda: self._broadcast(),
daemon=True
)
self.broadcast_thread.start()

def _broadcast(self):
while not self.heart.no_pulse.is_set():
payload = self.heart.queue_beat_logs.get(block=True)
self.publisher.delay(**payload)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from multiprocessing import Event, Queue, Process, Value
from threading import Thread

from .runtime import system_describe
from ..utils.runtime import system_describe


class DaemonHeart(Process):
Expand Down
7 changes: 7 additions & 0 deletions src/rhdzmota/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ def singleton(cls, overwrite_config_dictionary: Optional[Dict] = None) -> 'Logge
logger_manager = LoggerManager.singleton()
logger = logger_manager.get_logger(name=__name__)

# Celery

RHDZMOTA_CELERY_BROKER_HOST = get_environ_variable(
name="RHDZMOTA_CELERY_BROKER_HOST",
default="127.0.0.1"
)

# JWT

JWT_ENCRYPTION_ALGORITHM: str = get_environ_variable(
Expand Down
1 change: 1 addition & 0 deletions worker-hello.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
celery -A rhdzmota.celery_worker_hello worker --loglevel=INFO --concurrency=2 -O fair -P prefork

0 comments on commit 8a2ff23

Please sign in to comment.