Skip to content

Commit

Permalink
fix: WIP logging
Browse files Browse the repository at this point in the history
  • Loading branch information
tcjennings committed Jan 28, 2025
1 parent ecd4f18 commit b0c9aeb
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 25 deletions.
8 changes: 4 additions & 4 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

name: "${USER}-cm-service"
services:
init-db:
migratedb:
profiles:
- full
- server
Expand Down Expand Up @@ -46,13 +46,13 @@ services:
networks:
- cmservice
depends_on:
init-db:
migratedb:
condition: service_completed_successfully

cmworker:
profiles:
- full
- worker
- daemon
build:
context: .
dockerfile: docker/Dockerfile
Expand All @@ -66,7 +66,7 @@ services:
networks:
- cmservice
depends_on:
init-db:
migratedb:
condition: service_completed_successfully

postgresql:
Expand Down
39 changes: 22 additions & 17 deletions src/lsst/cmservice/common/daemon.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,42 @@
from datetime import datetime, timedelta

import structlog
from sqlalchemy.ext.asyncio import async_scoped_session
from sqlalchemy.future import select

from ..common.logging import LOGGER
from ..config import config
from ..db.queue import Queue
from ..db.script import Script

logger = structlog.get_logger(__name__)
logger = LOGGER.bind(module=__name__)


async def daemon_iteration(session: async_scoped_session) -> None:
iteration_start = datetime.now()
queue_entries = await session.execute(select(Queue).where(Queue.time_next_check < iteration_start))
logger.debug("Daemon Iteration: %s", iteration_start)

# TODO: should the daemon check any campaigns with a state == prepared that
# do not have queues? Queue creation should not be a manual step.
queue_entry: Queue
for (queue_entry,) in queue_entries:
queued_node = await queue_entry.get_node(session)
if (
queued_node.status.is_processable_script()
if isinstance(queued_node, Script)
else queued_node.status.is_processable_element()
):
logger.info(f"Processing queue_entry {queued_node.fullname}")
await queue_entry.process_node(session)
sleep_time = await queue_entry.node_sleep_time(session)
else:
# Put this entry to sleep for a while
sleep_time = config.daemon.processing_interval
time_next_check = iteration_start + timedelta(seconds=sleep_time)
queue_entry.time_next_check = time_next_check
logger.info(f"Next check for {queued_node.fullname} at {time_next_check}")
try:
queued_node = await queue_entry.get_node(session)
if (
queued_node.status.is_processable_script()
if isinstance(queued_node, Script)
else queued_node.status.is_processable_element()
):
logger.info("Processing queue_entry %s", queued_node.fullname)
await queue_entry.process_node(session)
sleep_time = await queue_entry.node_sleep_time(session)
else:
# Put this entry to sleep for a while
sleep_time = config.daemon.processing_interval
time_next_check = iteration_start + timedelta(seconds=sleep_time)
queue_entry.time_next_check = time_next_check
logger.info(f"Next check for {queued_node.fullname} at {time_next_check}")
except Exception:
logger.exception()
continue
await session.commit()
10 changes: 10 additions & 0 deletions src/lsst/cmservice/common/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Provide a common application root logger."""

import structlog
from safir.logging import configure_logging

from ..config import config

configure_logging(profile=config.logging.profile, log_level=config.logging.level, name=config.logging.handle)

LOGGER = structlog.get_logger(config.logging.handle)
5 changes: 5 additions & 0 deletions src/lsst/cmservice/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ class AsgiConfiguration(BaseModel):
class LoggingConfiguration(BaseModel):
"""Configuration for the application's logging facility."""

handle: str = Field(
default="cm-service",
title="Handle or name of the root logger",
)

level: str = Field(
default="INFO",
title="Log level of the application's logger",
Expand Down
8 changes: 4 additions & 4 deletions src/lsst/cmservice/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

import structlog
import uvicorn
from anyio import current_time, sleep_until
from fastapi import FastAPI
from safir.database import create_async_session, create_database_engine
from safir.logging import configure_logging, configure_uvicorn_logging
from safir.logging import configure_uvicorn_logging

from . import __version__
from .common.daemon import daemon_iteration
from .common.logging import LOGGER
from .config import config
from .routers.healthz import health_router

configure_uvicorn_logging(config.logging.level)
configure_logging(profile=config.logging.profile, log_level=config.logging.level, name=__name__)

logger = LOGGER.bind(module=__name__)


@asynccontextmanager
Expand All @@ -34,7 +35,6 @@ async def main_loop() -> None:
With a database session, perform a single daemon interation and then sleep
until the next daemon appointment.
"""
logger = structlog.get_logger(__name__)
engine = create_database_engine(config.db.url, config.db.password)
sleep_time = config.daemon.processing_interval

Expand Down
3 changes: 3 additions & 0 deletions src/lsst/cmservice/db/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@

from ..common.enums import StatusEnum
from ..common.errors import CMBadHandlerTypeError
from ..common.logging import LOGGER
from ..common.utils import add_sys_path

if TYPE_CHECKING:
from .element import ElementMixin
from .node import NodeMixin
from .script import Script

logger = LOGGER.bind(module=__name__)


class Handler:
"""Base class to handle callbacks generated by particular
Expand Down
4 changes: 4 additions & 0 deletions src/lsst/cmservice/db/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
CMResolveCollectionsError,
test_type_and_raise,
)
from ..common.logging import LOGGER
from ..config import config
from .handler import Handler
from .row import RowMixin
Expand All @@ -25,6 +26,8 @@
from .campaign import Campaign
from .element import ElementMixin

logger = LOGGER.bind(module=__name__)


class NodeMixin(RowMixin):
"""Mixin class to define common features of database rows
Expand Down Expand Up @@ -734,6 +737,7 @@ async def process(
Status of the processing
"""
handler = await self.get_handler(session)
logger.debug("Processing node with handler %s", handler.get_handler_class_name())
return await handler.process(session, self, **kwargs)

async def run_check(
Expand Down
5 changes: 5 additions & 0 deletions src/lsst/cmservice/db/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from ..common.enums import LevelEnum
from ..common.errors import CMBadEnumError, CMMissingFullnameError
from ..common.logging import LOGGER
from .base import Base
from .campaign import Campaign
from .group import Group
Expand All @@ -18,6 +19,8 @@
from .script import Script
from .step import Step

logger = LOGGER.bind(module=__name__)


class Queue(Base, NodeMixin):
"""Database table to implement processing queue"""
Expand Down Expand Up @@ -214,9 +217,11 @@ async def process_node(
node = await self.get_node(session)

if node.level == LevelEnum.script:
logger.debug("Processing a %s", node.level)
if not node.status.is_processable_script():
return False
if not node.status.is_processable_element():
logger.debug("Node %s is not processable", node.name)
return False

process_kwargs: dict = {}
Expand Down
3 changes: 3 additions & 0 deletions src/lsst/cmservice/handlers/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
)
from ..common.enums import LevelEnum, ScriptMethodEnum, StatusEnum
from ..common.errors import CMBadExecutionMethodError, CMMissingScriptInputError, test_type_and_raise
from ..common.logging import LOGGER
from ..config import config
from ..db.element import ElementMixin
from ..db.script import Script
from ..db.step import Step
from .script_handler import ScriptHandler

logger = LOGGER.bind(module=__name__)


class NullScriptHandler(ScriptHandler):
"""A no-op script, mostly for testing"""
Expand Down

0 comments on commit b0c9aeb

Please sign in to comment.