diff --git a/Dockerfile b/Dockerfile index 2602375e..f95c86d4 100755 --- a/Dockerfile +++ b/Dockerfile @@ -39,7 +39,7 @@ COPY chris_backend/ ./ RUN if [ "$ENVIRONMENT" = "production" ]; then \ env DJANGO_SETTINGS_MODULE=config.settings.common ./manage.py collectstatic; fi -CMD ["gunicorn", "-b", "0.0.0.0:8000", "-w", "4", "config.wsgi:application"] +CMD ["daphne", "-p", "8000", "config.asgi:application"] HEALTHCHECK --interval=30s --timeout=5s \ CMD curl -f http://localhost:8000/api/v1/users/ || exit 1 diff --git a/README.md b/README.md index 657a460e..e6c142f5 100755 --- a/README.md +++ b/README.md @@ -74,6 +74,8 @@ just prefer podman # or just prefer docker ``` +With Podman, RabbitMQ might fail to start. Simply retry the command. See https://github.com/FNNDSC/ChRIS_ultron_backEnd/issues/573 + ### Just Commands diff --git a/chris_backend/config/asgi.py b/chris_backend/config/asgi.py index 789432b0..a909fe40 100755 --- a/chris_backend/config/asgi.py +++ b/chris_backend/config/asgi.py @@ -10,9 +10,22 @@ import os, sys sys.path.append(os.path.dirname(os.path.dirname(__file__))) +from core.websockets.urls import websocket_urlpatterns +from core.websockets.auth import TokenQsAuthMiddleware + +from channels.routing import ProtocolTypeRouter, URLRouter from django.core.asgi import get_asgi_application +from channels.security.websocket import AllowedHostsOriginValidator os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.production') -application = get_asgi_application() +django_asgi_app = get_asgi_application() + +# see https://channels.readthedocs.io/en/3.x/installation.html +application = ProtocolTypeRouter({ + 'http': django_asgi_app, + 'websocket': AllowedHostsOriginValidator( + TokenQsAuthMiddleware(URLRouter(websocket_urlpatterns)) + ), +}) diff --git a/chris_backend/config/settings/common.py b/chris_backend/config/settings/common.py index c39ca94c..27e4a913 100755 --- a/chris_backend/config/settings/common.py +++ b/chris_backend/config/settings/common.py @@ -19,6 +19,7 @@ # Application definition INSTALLED_APPS = [ + 'daphne', 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', @@ -99,7 +100,7 @@ }, ] -WSGI_APPLICATION = 'config.wsgi.application' +ASGI_APPLICATION = 'config.asgi.application' # Database diff --git a/chris_backend/config/settings/local.py b/chris_backend/config/settings/local.py index 296cd494..2cac4fa0 100755 --- a/chris_backend/config/settings/local.py +++ b/chris_backend/config/settings/local.py @@ -161,6 +161,9 @@ CORS_EXPOSE_HEADERS = ['Allow', 'Content-Type', 'Content-Length'] +# NATS settings +NATS_ADDRESS = 'nats://nats:4222' + # Celery settings #CELERY_BROKER_URL = 'amqp://guest:guest@localhost' diff --git a/chris_backend/config/settings/production.py b/chris_backend/config/settings/production.py index 94f04bcc..2e9da7f9 100755 --- a/chris_backend/config/settings/production.py +++ b/chris_backend/config/settings/production.py @@ -128,6 +128,11 @@ def get_secret(setting, secret_type=env): CORS_ALLOWED_ORIGINS = get_secret('DJANGO_CORS_ALLOWED_ORIGINS', env.list) +# NATS SETTINGS +# ------------------------------------------------------------------------------ +NATS_ADDRESS = get_secret('NATS_ADDRESS') + + # CELERY SETTINGS # ------------------------------------------------------------------------------ CELERY_BROKER_URL = get_secret('CELERY_BROKER_URL') diff --git a/chris_backend/config/wsgi.py b/chris_backend/config/wsgi.py deleted file mode 100755 index 4de1837e..00000000 --- a/chris_backend/config/wsgi.py +++ /dev/null @@ -1,18 +0,0 @@ -""" -WSGI config for chris_backend project. - -It exposes the WSGI callable as a module-level variable named ``application``. - -For more information on this file, see -https://docs.djangoproject.com/en/4.2/howto/deployment/wsgi/ -""" - -import os, sys -sys.path.append(os.path.dirname(os.path.dirname(__file__))) - -from django.core.wsgi import get_wsgi_application - - -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.production') - -application = get_wsgi_application() diff --git a/chris_backend/core/celery.py b/chris_backend/core/celery.py index 09fe8350..23fbd439 100755 --- a/chris_backend/core/celery.py +++ b/chris_backend/core/celery.py @@ -33,6 +33,7 @@ {'queue': 'periodic'}, 'plugininstances.tasks.cancel_waiting_plugin_instances': {'queue': 'periodic'}, + 'pacsfiles.tasks.register_pacs_series': {'queue': 'main2'} } app.conf.update(task_routes=task_routes) diff --git a/chris_backend/core/middleware.py b/chris_backend/core/middleware.py index 29cebe47..fd926bc1 100755 --- a/chris_backend/core/middleware.py +++ b/chris_backend/core/middleware.py @@ -41,7 +41,7 @@ def __call__(self, request): return self.get_response(request) def process_exception(self, request, exception): - print(exception) + print(exception, flush=True) mime = request.META.get('HTTP_ACCEPT') if mime != 'text/html': return api_500(request) diff --git a/chris_backend/core/views.py b/chris_backend/core/views.py index 7fcab7f1..c0541214 100755 --- a/chris_backend/core/views.py +++ b/chris_backend/core/views.py @@ -124,31 +124,31 @@ def authenticate(self, request): # Check if 'download_token' is in the request query params if 'download_token' in request.query_params: token = request.query_params['download_token'] - err_msg = f'Invalid file download token: {token}' - - try: - info = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256']) - except jwt.ExpiredSignatureError: - err_msg = f'Expired file download token: {token}' - logger.error(err_msg) - raise exceptions.AuthenticationFailed(err_msg) - except jwt.InvalidTokenError: - logger.error(err_msg) - raise exceptions.AuthenticationFailed(err_msg) - - try: - user = User.objects.get(username=info['user']) - except User.DoesNotExist: - logger.error(err_msg) - raise exceptions.AuthenticationFailed(err_msg) - - try: - token_obj = FileDownloadToken.objects.get(owner=user, token=token) - except FileDownloadToken.DoesNotExist: - logger.error(err_msg) - raise exceptions.AuthenticationFailed(err_msg) - - token_obj.delete() # one-time-use token, we could instead set revoked=true - return user, None - + return authenticate_token(token), None return super(TokenAuthSupportQueryString, self).authenticate(request) + + +def authenticate_token(token: str) -> User: + err_msg = f'Invalid file download token: {token}' + try: + info = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256']) + except jwt.ExpiredSignatureError: + err_msg = f'Expired file download token: {token}' + logger.error(err_msg) + raise exceptions.AuthenticationFailed(err_msg) + except jwt.InvalidTokenError: + logger.error(err_msg) + raise exceptions.AuthenticationFailed(err_msg) + + try: + user = User.objects.get(username=info['user']) + except User.DoesNotExist: + logger.error(err_msg) + raise exceptions.AuthenticationFailed(err_msg) + + token_obj = FileDownloadToken.objects.filter(owner=user, token=token).first() + if token_obj is None: + raise exceptions.AuthenticationFailed(err_msg) + + token_obj.delete() # one-time-use token, we could instead set revoked=true + return user diff --git a/chris_backend/core/websockets/__init__.py b/chris_backend/core/websockets/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/chris_backend/core/websockets/auth.py b/chris_backend/core/websockets/auth.py new file mode 100755 index 00000000..b1ca99bf --- /dev/null +++ b/chris_backend/core/websockets/auth.py @@ -0,0 +1,53 @@ +""" +Websockets authentication. + +Notes +----- + +``channels.auth.AuthMiddlewareStack`` depends on HTTP headers, however HTTP headers +cannot be set for websockets in the web browser. https://stackoverflow.com/a/4361358 + +A common pattern is to put a token in the query string. We will re-use the file "downloadtokens" +for this purpose. +""" + +import urllib.parse +from typing import AnyStr + +from channels.db import database_sync_to_async +from django.contrib.auth.models import User +from rest_framework.exceptions import AuthenticationFailed + +from core.views import authenticate_token + + +class TokenQsAuthMiddleware: + """ + Authenticate the request using :class:`TokenAuthSupportQueryString`. + + Based on + https://channels.readthedocs.io/en/3.x/topics/authentication.html#custom-authentication + """ + + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + scope['user'] = await self._authenticate(scope) + return await self.app(scope, receive, send) + + @database_sync_to_async + def _authenticate(self, scope) -> User | None: + params = _parse_qs_last_string(scope.get('query_string', b'')) + if token := params.get('token', None): + try: + return authenticate_token(token) + except AuthenticationFailed: + return None + return None + + +def _parse_qs_last_string(qs: AnyStr, encoding='utf-8') -> dict[str, str]: + if isinstance(qs, bytes): + qs = qs.decode(encoding=encoding) + return {k: v[-1] for k, v in urllib.parse.parse_qs(qs).items()} diff --git a/chris_backend/core/websockets/urls.py b/chris_backend/core/websockets/urls.py new file mode 100755 index 00000000..f7217b3f --- /dev/null +++ b/chris_backend/core/websockets/urls.py @@ -0,0 +1,11 @@ +""" +Async routes. +""" + +from django.urls import re_path +from pacsfiles import consumers + +websocket_urlpatterns = [ + re_path(r'v1/pacs/ws/', + consumers.PACSFileProgress.as_asgi()), +] \ No newline at end of file diff --git a/chris_backend/pacsfiles/consumers.py b/chris_backend/pacsfiles/consumers.py new file mode 100755 index 00000000..7ce50662 --- /dev/null +++ b/chris_backend/pacsfiles/consumers.py @@ -0,0 +1,88 @@ +import rest_framework.permissions +from channels.db import database_sync_to_async +from channels.generic.websocket import AsyncJsonWebsocketConsumer +from django.conf import settings +from rest_framework import permissions + +from pacsfiles.lonk import ( + LonkClient, + validate_subscription, + LonkWsSubscription, + Lonk, +) +from pacsfiles.permissions import IsChrisOrIsPACSUserReadOnly + + +class PACSFileProgress(AsyncJsonWebsocketConsumer): + """ + A WebSockets endpoint which relays progress messages from NATS sent by *oxidicom* to a client. + """ + + permission_classes = ( + permissions.IsAuthenticated, + IsChrisOrIsPACSUserReadOnly, + ) + + async def connect(self): + if not await self._has_permission(): + return await self.close() + self.client: LonkClient = await LonkClient.connect( + settings.NATS_ADDRESS + ) + await self.accept() + + async def receive_json(self, content, **kwargs): + if validate_subscription(content): + await self._subscribe( + content['pacs_name'], content['SeriesInstanceUID'] + ) + return + await self.close(code=400, reason='Invalid subscription') + + async def _subscribe(self, pacs_name: str, series_instance_uid: str): + """ + Subscribe to progress notifications about the reception of a DICOM series. + """ + try: + await self.client.subscribe( + pacs_name, series_instance_uid, lambda msg: self.send_json(msg) + ) + response = Lonk( + pacs_name=pacs_name, + SeriesInstanceUID=series_instance_uid, + message=LonkWsSubscription(subscription='subscribed'), + ) + await self.send_json(response) + except Exception as e: + response = Lonk( + pacs_name=pacs_name, + SeriesInstanceUID=series_instance_uid, + message=LonkWsSubscription(subscription='error'), + ) + await self.send_json(response) + await self.close(code=500) + raise e + + async def disconnect(self, code): + await super().disconnect(code) + await self.client.close() + + @database_sync_to_async + def _has_permission(self) -> bool: + """ + Manual permissions check. + + django-channels is going to handle authentication for us, + but we need to implement permissions ourselves. + """ + self.user = self.scope.get('user', None) + if self.user is None: + return False + if getattr(self, 'method', None) is None: + # make it work with ``IsChrisOrIsPACSUserReadOnly`` + self.method = rest_framework.permissions.SAFE_METHODS[0] + + return all( + permission().has_permission(self, self.__class__) + for permission in self.permission_classes + ) diff --git a/chris_backend/pacsfiles/lonk.py b/chris_backend/pacsfiles/lonk.py new file mode 100644 index 00000000..b4dc51bd --- /dev/null +++ b/chris_backend/pacsfiles/lonk.py @@ -0,0 +1,209 @@ +""" +Implementation of the "Light Oxidicom NotifiKations Encoding" + +See https://chrisproject.org/docs/oxidicom/lonk +""" +import asyncio +import enum +from sys import byteorder +from typing import ( + Self, + Callable, + TypedDict, + Literal, + TypeGuard, + Any, + Awaitable, +) + +import nats +from nats import NATS +from nats.aio.subscription import Subscription +from nats.aio.msg import Msg + + +class SubscriptionRequest(TypedDict): + """ + A request to subscribe to LONK notifications about a DICOM series. + """ + + pacs_name: str + SeriesInstanceUID: str + action: Literal['subscribe'] + + +def validate_subscription(data: Any) -> TypeGuard[SubscriptionRequest]: + if not isinstance(data, dict): + return False + return ( + data.get('action', None) == 'subscribe' + and isinstance(data.get('SeriesInstanceUID', None), str) + and isinstance(data.get('pacs_name', None), str) + ) + + +class LonkProgress(TypedDict): + """ + LONK "done" message. + + https://chrisproject.org/docs/oxidicom/lonk#lonk-message-encoding + """ + + ndicom: int + + +class LonkError(TypedDict): + """ + LONK "error" message. + + https://chrisproject.org/docs/oxidicom/lonk#lonk-message-encoding + """ + + error: str + + +class LonkDone(TypedDict): + """ + LONK "done" message. + + https://chrisproject.org/docs/oxidicom/lonk#lonk-message-encoding + """ + + done: bool + + +class LonkWsSubscription(TypedDict): + """ + LONK-WS "subscribed" message. + + https://chrisproject.org/docs/oxidicom/lonk-ws#lonk-ws-subscription + """ + + subscription: Literal['subscribed', 'error'] + + +LonkMessageData = LonkProgress | LonkError | LonkDone | LonkWsSubscription +""" +Lonk message data. + +https://chrisproject.org/docs/oxidicom/lonk-ws#messages +""" + + +class Lonk(TypedDict): + """ + Serialized LONK message about a DICOM series. + + https://chrisproject.org/docs/oxidicom#lonk-message-encoding + """ + + SeriesInstanceUID: str + pacs_name: str + message: LonkMessageData + + +class LonkClient: + """ + "Light Oxidicom NotifiKations Encoding" client: + A client for the messages sent by *oxidicom* over NATS. + + https://chrisproject.org/docs/oxidicom/lonk + """ + + def __init__(self, nc: NATS): + self._nc = nc + self._subscriptions: list[Subscription] = [] + + @classmethod + async def connect(cls, servers: str | list[str]) -> Self: + return cls(await nats.connect(servers)) + + async def subscribe( + self, + pacs_name: str, + series_instance_uid: str, + cb: Callable[[Lonk], Awaitable[None]], + ): + subject = subject_of(pacs_name, series_instance_uid) + cb = _curry_message2json(pacs_name, series_instance_uid, cb) + subscription = await self._nc.subscribe(subject, cb=cb) + self._subscriptions.append(subscription) + return subscription + + async def close(self): + await asyncio.gather(*(s.unsubscribe() for s in self._subscriptions)) + await self._nc.close() + + +def subject_of(pacs_name: str, series_instance_uid: str) -> str: + """ + Get the NATS subject for a series. + + Equivalent to https://github.com/FNNDSC/oxidicom/blob/33838f22a5431a349b3b83a313035b8e22d16bb1/src/lonk.rs#L36-L48 + """ + return f'oxidicom.{_sanitize_topic_part(pacs_name)}.{_sanitize_topic_part(series_instance_uid)}' + + +def _sanitize_topic_part(s: str) -> str: + return ( + s.replace('\0', '') + .replace(' ', '_') + .replace('.', '_') + .replace('*', '_') + .replace('>', '_') + ) + + +def _message2json( + pacs_name: str, series_instance_uid: str, message: Msg +) -> Lonk: + return Lonk( + pacs_name=pacs_name, + SeriesInstanceUID=series_instance_uid, + message=_serialize_to_lonkws(message.data), + ) + + +def _curry_message2json( + pacs_name: str, + series_instance_uid: str, + cb: Callable[[Lonk], Awaitable[None]], +): + async def nats_callback(message: Msg): + lonk = _message2json(pacs_name, series_instance_uid, message) + await cb(lonk) + + return nats_callback + + +@enum.unique +class LonkMagicByte(enum.IntEnum): + """ + LONK message first magic byte. + """ + + DONE = 0x00 + PROGRESS = 0x01 + ERROR = 0x02 + + +def _serialize_to_lonkws(payload: bytes) -> LonkMessageData: + """ + Translate LONK binary encoding to LONK-WS JSON. + """ + if len(payload) == 0: + raise ValueError('Empty message') + data = payload[1:] + + match payload[0]: + case LonkMagicByte.DONE.value: + return LonkDone(done=True) + case LonkMagicByte.PROGRESS.value: + ndicom = int.from_bytes(data, 'little', signed=False) + return LonkProgress(ndicom=ndicom) + case LonkMagicByte.ERROR.value: + error = data.decode(encoding='utf-8') + return LonkError(error=error) + case _: + hexstr = ' '.join(hex(b) for b in payload) + raise ValueError(f'Unrecognized message: {hexstr}') diff --git a/chris_backend/pacsfiles/tasks.py b/chris_backend/pacsfiles/tasks.py new file mode 100755 index 00000000..8f09fbbc --- /dev/null +++ b/chris_backend/pacsfiles/tasks.py @@ -0,0 +1,60 @@ +from typing import Optional +from celery import shared_task +from django.contrib.auth.models import User + +from .serializers import PACSSeriesSerializer + + +@shared_task +def register_pacs_series( + PatientID: str, + StudyDate: str, + StudyInstanceUID: str, + SeriesInstanceUID: str, + pacs_name: str, + path: str, + ndicom: int, + PatientName: Optional[str] = None, + PatientBirthDate: Optional[str] = None, + PatientAge: Optional[int] = None, + PatientSex: Optional[str] = None, + AccessionNumber: Optional[str] = None, + Modality: Optional[str] = None, + ProtocolName: Optional[str] = None, + StudyDescription: Optional[str] = None, + SeriesDescription: Optional[str] = None, +): + """ + Register a DICOM series (directory of DICOM files) to the database. + + Pre-condition: DICOM files *must* exist in storage before running this task. + """ + data = { + 'PatientID': PatientID, + 'PatientName': PatientName, + 'PatientBirthDate': PatientBirthDate, + 'PatientAge': PatientAge, + 'PatientSex': PatientSex, + 'StudyDate': StudyDate, + 'AccessionNumber': AccessionNumber, + 'Modality': Modality, + 'ProtocolName': ProtocolName, + 'StudyInstanceUID': StudyInstanceUID, + 'StudyDescription': StudyDescription, + 'SeriesInstanceUID': SeriesInstanceUID, + 'SeriesDescription': SeriesDescription, + 'pacs_name': pacs_name, + 'path': path, + 'ndicom': ndicom, + } + serializer = PACSSeriesSerializer(data=_filter_some_values(data)) + serializer.is_valid(raise_exception=True) + owner = User.objects.get(username='chris') + serializer.save(owner=owner) + + +def _filter_some_values(x: dict[str, any]) -> dict[str, any]: + """ + Remove entries where the value is ``None``.` + """ + return {k: v for k, v in x.items() if v is not None} diff --git a/chris_backend/pacsfiles/tests/mocks.py b/chris_backend/pacsfiles/tests/mocks.py new file mode 100644 index 00000000..4f4825bf --- /dev/null +++ b/chris_backend/pacsfiles/tests/mocks.py @@ -0,0 +1,47 @@ +from typing import Self + +import nats +from nats import NATS + +from pacsfiles import lonk +from pacsfiles.lonk import LonkMagicByte + + +class Mockidicom: + """ + A mock *oxidicom* which sends LONK messages to NATS. + + Somewhat similar to https://github.com/FNNDSC/oxidicom/blob/e6bb83d1ea2fbaf5bb4af7dbf518a4b1a2957f2d/src/lonk.rs + """ + + def __init__(self, nc: NATS): + self._nc = nc + + @classmethod + async def connect(cls, servers: str | list[str]) -> Self: + nc = await nats.connect(servers) + return cls(nc) + + async def send_progress( + self, pacs_name: str, SeriesInstanceUID: str, ndicom: int + ): + subject = lonk.subject_of(pacs_name, SeriesInstanceUID) + u32 = ndicom.to_bytes(length=4, byteorder='little', signed=False) + data = LonkMagicByte.PROGRESS.value.to_bytes() + u32 + await self._nc.publish(subject, data) + + async def send_done(self, pacs_name: str, SeriesInstanceUID: str): + subject = lonk.subject_of(pacs_name, SeriesInstanceUID) + await self._nc.publish(subject, LonkMagicByte.DONE.value.to_bytes()) + + async def send_error( + self, pacs_name: str, SeriesInstanceUID: str, error: str + ): + subject = lonk.subject_of(pacs_name, SeriesInstanceUID) + data = LonkMagicByte.ERROR.value.to_bytes() + error.encode( + encoding='utf-8' + ) + await self._nc.publish(subject, data) + + async def close(self): + self._nc.close() diff --git a/chris_backend/pacsfiles/tests/test_consumers.py b/chris_backend/pacsfiles/tests/test_consumers.py new file mode 100644 index 00000000..458992a7 --- /dev/null +++ b/chris_backend/pacsfiles/tests/test_consumers.py @@ -0,0 +1,123 @@ +import jwt +from channels.db import database_sync_to_async +from django.conf import settings +from django.contrib.auth.models import User, Group + +# note: use TransactionTestCase instead of TestCase for async tests that speak to DB. +# See https://stackoverflow.com/a/71763849 +from django.test import TransactionTestCase, tag + +from channels.testing import WebsocketCommunicator +from django.utils import timezone + +from core.models import FileDownloadToken +from core.websockets.auth import TokenQsAuthMiddleware + +from pacsfiles.lonk import ( + SubscriptionRequest, + Lonk, + LonkWsSubscription, + LonkProgress, + LonkDone, + LonkError, +) +from pacsfiles.consumers import PACSFileProgress +from pacsfiles.tests.mocks import Mockidicom + + +class PACSFileProgressTests(TransactionTestCase): + def setUp(self): + self.username = 'PintoGideon' + self.password = 'gideon1234' + self.email = 'gideon@example.org' + self.user = User.objects.create_user( + username=self.username, email=self.email, password=self.password + ) + pacs_grp, _ = Group.objects.get_or_create(name='pacs_users') + self.user.groups.set([pacs_grp]) + self.user.save() + + @tag('integration') + async def test_lonk_ws(self): + token = await self._get_download_token() + app = TokenQsAuthMiddleware(PACSFileProgress.as_asgi()) + communicator = WebsocketCommunicator( + app, f'v1/pacs/ws/?token={token.token}' + ) + connected, subprotocol = await communicator.connect() + assert connected + + oxidicom: Mockidicom = await Mockidicom.connect(settings.NATS_ADDRESS) + + series1 = {'pacs_name': 'MyPACS', 'SeriesInstanceUID': '1.234.567890'} + subscription_request = SubscriptionRequest( + action='subscribe', **series1 + ) + await communicator.send_json_to(subscription_request) + self.assertEqual( + await communicator.receive_json_from(), + Lonk( + message=LonkWsSubscription(subscription='subscribed'), + **series1, + ), + ) + series2 = {'pacs_name': 'MyPACS', 'SeriesInstanceUID': '5.678.90123'} + subscription_request = SubscriptionRequest( + action='subscribe', **series2 + ) + await communicator.send_json_to(subscription_request) + self.assertEqual( + await communicator.receive_json_from(), + Lonk( + message=LonkWsSubscription(subscription='subscribed'), + **series2, + ), + ) + + await oxidicom.send_progress(ndicom=1, **series1) + self.assertEqual( + await communicator.receive_json_from(), + Lonk(message=LonkProgress(ndicom=1), **series1), + ) + await oxidicom.send_progress(ndicom=115, **series1) + self.assertEqual( + await communicator.receive_json_from(), + Lonk(message=LonkProgress(ndicom=115), **series1), + ) + + await oxidicom.send_error(error='stuck in chimney', **series2) + self.assertEqual( + await communicator.receive_json_from(), + Lonk(message=LonkError(error='stuck in chimney'), **series2), + ) + + await oxidicom.send_progress(ndicom=192, **series1) + self.assertEqual( + await communicator.receive_json_from(), + Lonk(message=LonkProgress(ndicom=192), **series1), + ) + await oxidicom.send_done(**series1) + self.assertEqual( + await communicator.receive_json_from(), + Lonk(message=LonkDone(done=True), **series1), + ) + + async def test_unauthenticated_not_connected(self): + app = TokenQsAuthMiddleware(PACSFileProgress.as_asgi()) + communicator = WebsocketCommunicator(app, 'v1/pacs/ws/') # no token + connected, subprotocol = await communicator.connect() + assert not connected + + @database_sync_to_async + def _get_download_token(self) -> FileDownloadToken: + """ + Copy-pasted from + https://github.com/FNNDSC/ChRIS_ultron_backEnd/blob/7bcccc2031386955875ef4e9758025577f5ee067/chris_backend/userfiles/tests/test_views.py#L210-L213 + """ + dt = timezone.now() + timezone.timedelta(minutes=10) + token = jwt.encode( + {'user': self.user.username, 'exp': dt}, + settings.SECRET_KEY, + algorithm='HS256', + ) + return FileDownloadToken.objects.create(token=token, owner=self.user) diff --git a/chris_backend/pacsfiles/tests/test_tasks.py b/chris_backend/pacsfiles/tests/test_tasks.py new file mode 100755 index 00000000..30da2844 --- /dev/null +++ b/chris_backend/pacsfiles/tests/test_tasks.py @@ -0,0 +1,108 @@ +import logging +import json +import io +from unittest import mock + +from django.test import TestCase, tag +from django.conf import settings +from django.contrib.auth.models import User, Group +from django.urls import reverse + +from rest_framework import status +from rest_framework.exceptions import ValidationError + +from core.models import ChrisFolder +from core.storage import connect_storage +from pacsfiles.models import PACS, PACSSeries, PACSFile +from pacsfiles.tasks import register_pacs_series + + +class PACSSeriesCreateTests(TestCase): + """ + Test creating PACS series using the task function. + """ + + def setUp(self): + self.storage_manager = connect_storage(settings) + + def tearDown(self): + super().tearDown() + test_data_dir = 'SERVICES/PACS/MyPACS/123456-crazy' + if self.storage_manager.path_exists(test_data_dir): + self.storage_manager.delete_path(test_data_dir) + + @tag('integration') + def test_integration_pacs_series_create_success(self): + series_instance_uid = '1.1.3432.54.6545674765.765434' + series_dir = 'SERVICES/PACS/MyPACS/123456-crazy/brain_crazy_study/SAG_T1_MPRAGE1/file2.dcm' + path = series_dir + '/file2.dcm' + fake_dicom_content = b'test file content' + # upload fake file to storage + self.storage_manager.upload_obj(path, fake_dicom_content) + + # invoke the task to register the file + register_pacs_series( + path=path, + ndicom=1, + PatientID='12345', + PatientName='crazy', + PatientSex='O', + StudyDate='2020-07-15', + StudyInstanceUID='1.1.3432.54.6545674765.765434', + StudyDescription='brain crazy study', + SeriesInstanceUID=series_instance_uid, + SeriesDescription='SAG T1 MPRAGE', + pacs_name='MyPACS', + ) + + # assert PACSSeries and PACSFile were created + series = PACSSeries.objects.get(SeriesInstanceUID=series_instance_uid) + self.assertEqual(series.StudyDescription, 'brain crazy study') + + def test_pacs_series_create_failure_already_exists(self): + pacs_path = 'SERVICES/PACS/MyPACS' + series_path = ( + f'{pacs_path}/123456-crazy/brain_crazy_study/SAG_T1_MPRAGE' + ) + + owner = User.objects.get(username='chris') + series_folder, _ = ChrisFolder.objects.get_or_create( + path=series_path, owner=owner + ) + pacs_folder, _ = ChrisFolder.objects.get_or_create( + path=pacs_path, owner=owner + ) + pacs, _ = PACS.objects.get_or_create( + identifier='MyPACS', folder=pacs_folder + ) + existing_series, _ = PACSSeries.objects.get_or_create( + PatientID='123456', + StudyDate='2020-07-15', + StudyInstanceUID='1.1.3432.54.6545674765.765434', + SeriesInstanceUID='2.4.3432.54.845674765.763345', + pacs=pacs, + folder=series_folder, + ) + + file_content = b'example DICOM which should not be registered' + self.storage_manager.upload_obj( + f'{series_path}/file1.dcm', file_content + ) + expected = ( + f'A DICOM series with SeriesInstanceUID={existing_series.SeriesInstanceUID} ' + 'already registered for pacs MyPACS' + ) + with self.assertRaisesRegex(ValidationError, expected): + register_pacs_series( + PatientID='12345', + StudyDate='2020-07-15', + StudyInstanceUID='1.1.3432.54.6545674765.765434', + SeriesInstanceUID='2.4.3432.54.845674765.763345', + pacs_name='MyPACS', + path=series_path, + ndicom=1, + PatientName='crazy', + PatientSex='O', + StudyDescription='brain_crazy_study', + SeriesDescription='SAG T1 MPRAGE', + ) diff --git a/chris_backend/pacsfiles/tests/test_views.py b/chris_backend/pacsfiles/tests/test_views.py index b4c70be5..cd044282 100755 --- a/chris_backend/pacsfiles/tests/test_views.py +++ b/chris_backend/pacsfiles/tests/test_views.py @@ -90,85 +90,6 @@ class PACSSeriesListViewTests(PACSViewTests): def setUp(self): super(PACSSeriesListViewTests, self).setUp() self.create_read_url = reverse("pacsseries-list") - path = 'SERVICES/PACS/MyPACS/123456-crazy/brain_crazy_study/SAG_T1_MPRAGE1' - self.post = json.dumps( - {"template": {"data": [{"name": "path", "value": path}, - {"name": "ndicom", "value": 1}, - {"name": "PatientID", "value": "12345"}, - {"name": "PatientName", "value": "crazy"}, - {"name": "PatientSex", "value": "O"}, - {"name": "StudyDate", "value": '2020-07-15'}, - {"name": "StudyInstanceUID", - "value": '1.1.3432.54.6545674765.765434'}, - {"name": "StudyDescription", "value": "brain_crazy_study"}, - {"name": "SeriesInstanceUID", - "value": "2.4.3432.54.845674765.763346"}, - {"name": "SeriesDescription", "value": "SAG T1 MPRAGE"}, - {"name": "pacs_name", "value": "MyPACS"}]}}) - - @tag('integration') - def test_integration_pacs_series_create_success(self): - path = 'SERVICES/PACS/MyPACS/123456-crazy/brain_crazy_study/SAG_T1_MPRAGE1' - # upload file to storage - with io.StringIO("test file") as file1: - self.storage_manager.upload_obj(path + '/file2.dcm', file1.read(), - content_type='text/plain') - - # make the POST request using the chris user - self.client.login(username=self.chris_username, password=self.chris_password) - response = self.client.post(self.create_read_url, data=self.post, - content_type=self.content_type) - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - # delete file from storage - self.storage_manager.delete_obj(path + '/file2.dcm') - - def test_pacs_series_create_failure_unauthenticated(self): - response = self.client.post(self.create_read_url, data=self.post, - content_type=self.content_type) - self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) - - def test_pacs_series_create_failure_already_exists(self): - self.client.login(username=self.chris_username, password=self.chris_password) - path = 'SERVICES/PACS/MyPACS/123456-crazy/brain_crazy_study/SAG_T1_MPRAGE' - - owner = User.objects.get(username=self.chris_username) - (series_folder, _) = ChrisFolder.objects.get_or_create(path=path, - owner=owner) - pacs = PACS.objects.get(identifier='MyPACS') - PACSSeries.objects.get_or_create(PatientID='123456', - StudyDate='2020-07-15', - StudyInstanceUID='1.1.3432.54.6545674765.765434', - SeriesInstanceUID='2.4.3432.54.845674765.763345', - pacs=pacs, - folder=series_folder) - - post = json.dumps( - {"template": {"data": [{"name": "path", "value": path}, - {"name": "ndicom", "value": 1}, - {"name": "PatientID", "value": "12345"}, - {"name": "PatientName", "value": "crazy"}, - {"name": "PatientSex", "value": "O"}, - {"name": "StudyDate", "value": '2020-07-15'}, - {"name": "StudyInstanceUID", - "value": '1.1.3432.54.6545674765.765434'}, - {"name": "StudyDescription", - "value": "brain_crazy_study"}, - {"name": "SeriesInstanceUID", - "value": "2.4.3432.54.845674765.763345"}, - {"name": "SeriesDescription", - "value": "SAG T1 MPRAGE"}, - {"name": "pacs_name", "value": "MyPACS"}]}}) - - response = self.client.post(self.create_read_url, data=post, - content_type=self.content_type) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - - def test_pacs_series_create_failure_permission_denied_not_chris_user(self): - self.client.login(username=self.username, password=self.password) - response = self.client.post(self.create_read_url, data=self.post, - content_type=self.content_type) - self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) def test_pacs_series_list_success(self): self.client.login(username=self.username, password=self.password) diff --git a/chris_backend/pacsfiles/views.py b/chris_backend/pacsfiles/views.py index d01e5204..7cb35fb7 100755 --- a/chris_backend/pacsfiles/views.py +++ b/chris_backend/pacsfiles/views.py @@ -16,7 +16,7 @@ class PACSSeriesList(generics.ListCreateAPIView): """ A view for the collection of PACS Series. """ - http_method_names = ['get', 'post'] + http_method_names = ['get'] queryset = PACSSeries.objects.all() serializer_class = PACSSeriesSerializer permission_classes = (permissions.IsAuthenticated, IsChrisOrIsPACSUserReadOnly,) diff --git a/docker-compose_just.yml b/docker-compose_just.yml index 4ad7bb4d..f3ea8c62 100755 --- a/docker-compose_just.yml +++ b/docker-compose_just.yml @@ -39,6 +39,8 @@ services: condition: service_healthy rabbitmq: condition: service_started + nats: + condition: service_started cube-nonroot-user-volume-fix: condition: service_completed_successfully networks: @@ -148,6 +150,19 @@ services: - local userns_mode: "host" + nats: + image: docker.io/library/nats:2.10.20-alpine3.20 + ports: + - "4222:4222" + - "8222:8222" + networks: + local: + healthcheck: + test: wget http://localhost:8222/healthz -q -S -O - + start_period: 20s + retries: 3 + timeout: 10s + interval: 5s lldap: image: docker.io/nitnelave/lldap:stable ports: diff --git a/githubActions/main.js b/githubActions/main.js index ca7d97be..423385df 100755 --- a/githubActions/main.js +++ b/githubActions/main.js @@ -6,6 +6,19 @@ const JUST_COMMAND = process.env.INPUT_COMMAND; const script = ` set -x just prefer ${CONTAINER_ENGINE} + +# start-up is being retried as a workaround for +# https://github.com/FNNDSC/ChRIS_ultron_backEnd/issues/573 +for i in {1..5}; do + just start-ancillary && start=good && break + echo "::warning ::Ancillary services failed to start. Attempt=$i" +done + +if [ "$start" != "good" ]; then + echo "::error ::Failed to start ancillary services." + exit 1 +fi + just ${JUST_COMMAND} rc=$? if [ "$rc" != '0' ]; then diff --git a/requirements/base.txt b/requirements/base.txt index 255fb9a2..6014b949 100755 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -14,3 +14,5 @@ django-auth-ldap==4.5.0 PyYAML==6.0.1 whitenoise[brotli]==6.5.0 PyJWT===2.8.0 +channels[daphne]==4.1.0 +nats-py==2.9.0 \ No newline at end of file diff --git a/unregister_pacsfiles.sh b/unregister_pacsfiles.sh new file mode 100755 index 00000000..9b35f185 --- /dev/null +++ b/unregister_pacsfiles.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +docker compose -f docker-compose_dev.yml exec chris_dev pip install tqdm +docker compose -f docker-compose_dev.yml exec chris_dev python manage.py shell -c ' +from tqdm import tqdm +from pacsfiles.models import PACSSeries, PACSFile + +with tqdm(PACSFile.objects.all()) as pbar: + for pacs_file in pbar: + _ = pacs_file.delete() + +with tqdm(PACSSeries) as pbar: + for pacs_series in pbar: + _ = pacs_series.delete() +'