Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - add celery worker #21

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,33 @@ services:
postgres:
condition: service_healthy

orchestrator-worker:
<<: *orchestrator
ports:
# - "8080:8080"
- "5679:5678" #Enable Python debugger
expose:
- 5679 #Enable Python debugger
volumes: # CLEANUP if it works
- ./workflows:/home/orchestrator/workflows
- ./products:/home/orchestrator/products
- ./migrations:/home/orchestrator/migrations
- ./docker:/home/orchestrator/etc
- ./main.py:/home/orchestrator/main.py
- ./graphql_federation.py:/home/orchestrator/graphql_federation.py
- ./settings.py:/home/orchestrator/settings.py
- ./tasks.py:/home/orchestrator/tasks.py
- ./utils:/home/orchestrator/utils
- ./services:/home/orchestrator/services
- ./requirements.txt:/home/orchestrator/requirements.txt
- ./requirements-worker.txt:/home/orchestrator/requirements-worker.txt
- ./translations:/home/orchestrator/translations
- ./templates:/home/orchestrator/templates
entrypoint: [ "/home/orchestrator/etc/orchestrator/entrypoint_worker.sh" ]
depends_on:
postgres:
condition: service_healthy

#
# LSO
#
Expand Down
5 changes: 4 additions & 1 deletion docker/orchestrator/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#!/bin/bash
python main.py db upgrade heads

pip install -r requirements.txt

python main.py db upgrade heads

python -u -m uvicorn --reload --proxy-headers --workers 4 --host 0.0.0.0 --port 8080 $UVICORN_ARGS main:app
# python -u -m debugpy --listen 0.0.0.0:5678 -m uvicorn --reload --proxy-headers --workers 4 --host 0.0.0.0 --port 8080 $UVICORN_ARGS main:app
# python -u -m debugpy --listen 0.0.0.0:5678 -m uvicorn --proxy-headers --workers 4 --host 0.0.0.0 --port 8080 $UVICORN_ARGS main:app
6 changes: 6 additions & 0 deletions docker/orchestrator/entrypoint_worker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash
pip install -r requirements-worker.txt

export PATH=$PATH:$HOME/.local/bin

watchmedo auto-restart --patterns="*.py;*.json" --recursive -- celery -A tasks worker -E --loglevel=info -Q new_workflows,new_tasks,resume_tasks,resume_workflows --concurrency=1 -n%n
3 changes: 3 additions & 0 deletions docker/orchestrator/orchestrator.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ OAUTH2_ACTIVE=False
LSO_PLAYBOOK_URL=http://example-orchestrator-lso-1:8000/api/playbook
ORCHESTRATOR_URL=http://example-orchestrator-orchestrator-1:8080
FEDERATION_ENABLED=True
EXECUTOR=celery
CACHE_URI="redis://:nwa@redis:6379"
TESTING=false
12 changes: 12 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
from orchestrator import OrchestratorCore
from orchestrator.cli.main import app as core_cli
from orchestrator.settings import AppSettings
from celery import Celery
from settings import backend, broker

from orchestrator.settings import app_settings
from orchestrator.services.tasks import initialise_celery

from graphql_federation import CUSTOM_GRAPHQL_MODELS
import products # noqa: F401 Side-effects
Expand All @@ -23,5 +28,12 @@
app = OrchestratorCore(base_settings=AppSettings())
app.register_graphql(graphql_models=CUSTOM_GRAPHQL_MODELS)

celery = Celery(app_settings.SERVICE_NAME, broker=broker, backend=backend, include=["orchestrator.services.tasks"])
celery.conf.update(
result_expires=3600,
)
initialise_celery(celery)


if __name__ == "__main__":
core_cli()
3 changes: 3 additions & 0 deletions requirements-worker.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-r requirements.txt

watchdog # Optional, used for hot reloading in development
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ orchestrator-core
deepdiff
rich
pynetbox
celery
5 changes: 5 additions & 0 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# limitations under the License.


from orchestrator.settings import app_settings
from pydantic_settings import BaseSettings


Expand All @@ -25,3 +26,7 @@ class Settings(BaseSettings):


settings = Settings()


backend = str(app_settings.CACHE_URI)
broker = str(app_settings.CACHE_URI)
118 changes: 118 additions & 0 deletions tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Copyright 2019-2024 SURF.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from uuid import UUID

from celery import Celery
from celery.signals import setup_logging, worker_shutting_down
from structlog import get_logger

from nwastdlib.logging import initialise_logging
from orchestrator.db import init_database
from orchestrator.db.database import ENGINE_ARGUMENTS
from orchestrator.domain import SUBSCRIPTION_MODEL_REGISTRY
from orchestrator.log_config import LOGGER_OVERRIDES, logger_config
from orchestrator.services.tasks import initialise_celery
from orchestrator.settings import app_settings
from orchestrator.types import BroadcastFunc
from orchestrator.websocket import (
broadcast_process_update_to_websocket,
init_websocket_manager,
)
from orchestrator.websocket.websocket_manager import WebSocketManager
from orchestrator.workflows import ALL_WORKFLOWS
from settings import backend, broker


logger = get_logger(__name__)


LOGGER_OVERRIDES_CELERY = LOGGER_OVERRIDES | dict(
[
logger_config("celery"),
logger_config("kombu"),
]
)


@setup_logging.connect # type: ignore[misc]
def on_setup_logging(**kwargs: Any) -> None:
initialise_logging(additional_loggers=LOGGER_OVERRIDES_CELERY)


def process_broadcast_fn(process_id: UUID) -> None:
# Catch all exceptions as broadcasting failure is noncritical to workflow completion
try:
broadcast_process_update_to_websocket(process_id)
except Exception as e:
logger.exception(e)


class OrchestratorWorker(Celery):
websocket_manager: WebSocketManager
process_broadcast_fn: BroadcastFunc

def on_init(self) -> None:
# ENGINE_ARGUMENTS["pool_size"] = 10
init_database(app_settings)

# Prepare the wrapped_websocket_manager
# Note: cannot prepare the redis connections here as broadcasting is async
self.websocket_manager = init_websocket_manager(app_settings)
self.process_broadcast_fn = process_broadcast_fn

# Load the products and load the workflows
import products # noqa: F401 Side-effects
import workflows # noqa: F401 Side-effects

logger.info(
"Loaded the workflows and products",
workflows=len(ALL_WORKFLOWS.values()),
products=len(SUBSCRIPTION_MODEL_REGISTRY.values()),
)
# register_forms()

def close(self) -> None:
super().close()


celery = OrchestratorWorker(
f"{app_settings.SERVICE_NAME}-worker", broker=broker, include=["orchestrator.services.tasks"]
)

if app_settings.TESTING:
celery.conf.update(backend=backend, task_ignore_result=False)
else:
celery.conf.update(task_ignore_result=True)

celery.conf.update(
result_expires=3600,
worker_prefetch_multiplier=1,
worker_send_task_event=True,
task_send_sent_event=True,
)

# Needed if we load this as a Celery worker because in that case there is no 'main app'
initialise_celery(celery)


@worker_shutting_down.connect # type: ignore
def worker_shutting_down_handler(sig, how, exitcode, **kwargs) -> None:
celery.close()


# TODO figure out what's missing in the workflow registration - without this I can't run any workflows
# [error ] Worker failed to execute workflow [orchestrator.services.tasks] details=Invalid workflow: module workflows.tasks.bootstrap_netbox does not exist or has invalid imports process_id=c1011606-b06d-419b-9a2f-69319610e7a5
print("IMPORTING..")
from workflows.tasks.bootstrap_netbox import task_bootstrap_netbox # noqa
print("IMPORTED")