From 849f61457d36dfca7329c8499681fbb51c2b292e Mon Sep 17 00:00:00 2001 From: Mark90 Date: Tue, 23 Jul 2024 23:37:51 +0200 Subject: [PATCH 1/3] WIP - add celery worker --- docker-compose.yml | 27 ++++++ docker/orchestrator/entrypoint_worker.sh | 6 ++ docker/orchestrator/orchestrator.env | 2 + main.py | 12 +++ requirements-worker.txt | 3 + requirements.txt | 1 + settings.py | 5 + tasks.py | 118 +++++++++++++++++++++++ 8 files changed, 174 insertions(+) create mode 100755 docker/orchestrator/entrypoint_worker.sh create mode 100644 requirements-worker.txt create mode 100644 tasks.py diff --git a/docker-compose.yml b/docker-compose.yml index 4ec5f79..3c36d6e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -175,6 +175,33 @@ services: postgres: condition: service_healthy + orchestrator-worker: + <<: *orchestrator + ports: + # - "8080:8080" + - "5679:5678" #Enable Python debugger + expose: + - 5679 #Enable Python debugger + volumes: # CLEANUP if it works + - ./workflows:/home/orchestrator/workflows + - ./products:/home/orchestrator/products + - ./migrations:/home/orchestrator/migrations + - ./docker:/home/orchestrator/etc + - ./main.py:/home/orchestrator/main.py + - ./graphql_federation.py:/home/orchestrator/graphql_federation.py + - ./settings.py:/home/orchestrator/settings.py + - ./tasks.py:/home/orchestrator/tasks.py + - ./utils:/home/orchestrator/utils + - ./services:/home/orchestrator/services + - ./requirements.txt:/home/orchestrator/requirements.txt + - ./requirements-worker.txt:/home/orchestrator/requirements-worker.txt + - ./translations:/home/orchestrator/translations + - ./templates:/home/orchestrator/templates + entrypoint: [ "/home/orchestrator/etc/orchestrator/entrypoint_worker.sh" ] + depends_on: + postgres: + condition: service_healthy + # # LSO # diff --git a/docker/orchestrator/entrypoint_worker.sh b/docker/orchestrator/entrypoint_worker.sh new file mode 100755 index 0000000..7ccaaed --- /dev/null +++ b/docker/orchestrator/entrypoint_worker.sh @@ -0,0 +1,6 @@ +#!/bin/bash +pip install -r requirements-worker.txt + +export PATH=$PATH:$HOME/.local/bin + +watchmedo auto-restart --patterns="*.py;*.json" --recursive -- celery -A tasks worker -E --loglevel=info -Q new_workflows,new_tasks,resume_tasks,resume_workflows --concurrency=1 -n%n diff --git a/docker/orchestrator/orchestrator.env b/docker/orchestrator/orchestrator.env index 86c8d49..16c8cf7 100644 --- a/docker/orchestrator/orchestrator.env +++ b/docker/orchestrator/orchestrator.env @@ -9,3 +9,5 @@ OAUTH2_ACTIVE=False LSO_PLAYBOOK_URL=http://example-orchestrator-lso-1:8000/api/playbook ORCHESTRATOR_URL=http://example-orchestrator-orchestrator-1:8080 FEDERATION_ENABLED=True +EXECUTOR=celery +CACHE_URI="redis://:nwa@redis:6379" diff --git a/main.py b/main.py index 10f99f9..436d411 100644 --- a/main.py +++ b/main.py @@ -15,6 +15,11 @@ from orchestrator import OrchestratorCore from orchestrator.cli.main import app as core_cli from orchestrator.settings import AppSettings +from celery import Celery +from settings import backend, broker + +from orchestrator.settings import app_settings +from orchestrator.services.tasks import initialise_celery from graphql_federation import CUSTOM_GRAPHQL_MODELS import products # noqa: F401 Side-effects @@ -23,5 +28,12 @@ app = OrchestratorCore(base_settings=AppSettings()) app.register_graphql(graphql_models=CUSTOM_GRAPHQL_MODELS) +celery = Celery(app_settings.SERVICE_NAME, broker=broker, backend=backend, include=["orchestrator.services.tasks"]) +celery.conf.update( + result_expires=3600, +) +initialise_celery(celery) + + if __name__ == "__main__": core_cli() diff --git a/requirements-worker.txt b/requirements-worker.txt new file mode 100644 index 0000000..220d613 --- /dev/null +++ b/requirements-worker.txt @@ -0,0 +1,3 @@ +-r requirements.txt + +watchdog # Optional, used for hot reloading in development diff --git a/requirements.txt b/requirements.txt index 46c1d0a..14feb4d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ orchestrator-core deepdiff rich pynetbox +celery diff --git a/settings.py b/settings.py index dcbd5e0..042eb0c 100644 --- a/settings.py +++ b/settings.py @@ -12,6 +12,7 @@ # limitations under the License. +from orchestrator.settings import app_settings from pydantic_settings import BaseSettings @@ -25,3 +26,7 @@ class Settings(BaseSettings): settings = Settings() + + +backend = str(app_settings.CACHE_URI) +broker = str(app_settings.CACHE_URI) diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..f2618a9 --- /dev/null +++ b/tasks.py @@ -0,0 +1,118 @@ +# Copyright 2019-2024 SURF. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Any +from uuid import UUID + +from celery import Celery +from celery.signals import setup_logging, worker_shutting_down +from structlog import get_logger + +from nwastdlib.logging import initialise_logging +from orchestrator.db import init_database +from orchestrator.db.database import ENGINE_ARGUMENTS +from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY +from orchestrator.log_config import LOGGER_OVERRIDES, logger_config +from orchestrator.services.tasks import initialise_celery +from orchestrator.settings import app_settings +from orchestrator.types import BroadcastFunc +from orchestrator.websocket import ( + broadcast_process_update_to_websocket, + init_websocket_manager, +) +from orchestrator.websocket.websocket_manager import WebSocketManager +from orchestrator.workflows import ALL_WORKFLOWS +from settings import backend, broker + + +logger = get_logger(__name__) + + +LOGGER_OVERRIDES_CELERY = LOGGER_OVERRIDES | dict( + [ + logger_config("celery"), + logger_config("kombu"), + ] +) + + +@setup_logging.connect # type: ignore[misc] +def on_setup_logging(**kwargs: Any) -> None: + initialise_logging(additional_loggers=LOGGER_OVERRIDES_CELERY) + + +def process_broadcast_fn(process_id: UUID) -> None: + # Catch all exceptions as broadcasting failure is noncritical to workflow completion + try: + broadcast_process_update_to_websocket(process_id) + except Exception as e: + logger.exception(e) + + +class OrchestratorWorker(Celery): + websocket_manager: WebSocketManager + process_broadcast_fn: BroadcastFunc + + def on_init(self) -> None: + # ENGINE_ARGUMENTS["pool_size"] = 10 + init_database(app_settings) + + # Prepare the wrapped_websocket_manager + # Note: cannot prepare the redis connections here as broadcasting is async + self.websocket_manager = init_websocket_manager(app_settings) + self.process_broadcast_fn = process_broadcast_fn + + # Load the products and load the workflows + import products # noqa: F401 Side-effects + import workflows # noqa: F401 Side-effects + + logger.info( + "Loaded the workflows and products", + workflows=len(ALL_WORKFLOWS.values()), + products=len(SUBSCRIPTION_MODEL_REGISTRY.values()), + ) + # register_forms() + + def close(self) -> None: + super().close() + + +celery = OrchestratorWorker( + f"{app_settings.SERVICE_NAME}-worker", broker=broker, include=["orchestrator.services.tasks"] +) + +if app_settings.TESTING: + celery.conf.update(backend=backend, task_ignore_result=False) +else: + celery.conf.update(task_ignore_result=True) + +celery.conf.update( + result_expires=3600, + worker_prefetch_multiplier=1, + worker_send_task_event=True, + task_send_sent_event=True, +) + +# Needed if we load this as a Celery worker because in that case there is no 'main app' +initialise_celery(celery) + + +@worker_shutting_down.connect # type: ignore +def worker_shutting_down_handler(sig, how, exitcode, **kwargs) -> None: + celery.close() + + +# TODO figure out what's missing in the workflow registration - without this I can't run any workflows +# [error ] Worker failed to execute workflow [orchestrator.services.tasks] details=Invalid workflow: module workflows.tasks.bootstrap_netbox does not exist or has invalid imports process_id=c1011606-b06d-419b-9a2f-69319610e7a5 +print("IMPORTING..") +from workflows.tasks.bootstrap_netbox import task_bootstrap_netbox # noqa +print("IMPORTED") From 218310387609a1999d785baf61f07e33b40d80a1 Mon Sep 17 00:00:00 2001 From: Mark90 Date: Wed, 24 Jul 2024 09:37:54 +0200 Subject: [PATCH 2/3] Set TESTING=false for async scheduling --- docker/orchestrator/orchestrator.env | 1 + 1 file changed, 1 insertion(+) diff --git a/docker/orchestrator/orchestrator.env b/docker/orchestrator/orchestrator.env index 16c8cf7..608cc68 100644 --- a/docker/orchestrator/orchestrator.env +++ b/docker/orchestrator/orchestrator.env @@ -11,3 +11,4 @@ ORCHESTRATOR_URL=http://example-orchestrator-orchestrator-1:8080 FEDERATION_ENABLED=True EXECUTOR=celery CACHE_URI="redis://:nwa@redis:6379" +TESTING=false From 7c1d2fe0dd58418b132d85ad1c4cccaa77fc8e3b Mon Sep 17 00:00:00 2001 From: Mark90 Date: Wed, 24 Jul 2024 09:50:03 +0200 Subject: [PATCH 3/3] Update orchestrator entrypoint to install requirements before running migrations --- docker/orchestrator/entrypoint.sh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docker/orchestrator/entrypoint.sh b/docker/orchestrator/entrypoint.sh index 7c4c6c9..685d06f 100755 --- a/docker/orchestrator/entrypoint.sh +++ b/docker/orchestrator/entrypoint.sh @@ -1,6 +1,9 @@ #!/bin/bash -python main.py db upgrade heads + pip install -r requirements.txt + +python main.py db upgrade heads + python -u -m uvicorn --reload --proxy-headers --workers 4 --host 0.0.0.0 --port 8080 $UVICORN_ARGS main:app # python -u -m debugpy --listen 0.0.0.0:5678 -m uvicorn --reload --proxy-headers --workers 4 --host 0.0.0.0 --port 8080 $UVICORN_ARGS main:app # python -u -m debugpy --listen 0.0.0.0:5678 -m uvicorn --proxy-headers --workers 4 --host 0.0.0.0 --port 8080 $UVICORN_ARGS main:app