diff --git a/tests/test_celery_pool_asyncio.py b/tests/test_celery_pool_asyncio.py index e87a026..5576ace 100644 --- a/tests/test_celery_pool_asyncio.py +++ b/tests/test_celery_pool_asyncio.py @@ -1,16 +1,6 @@ #!/usr/bin/env python - -import pathlib import asyncio -import time -from pprint import pformat -#from concurrent.futures import TimeoutError - -import pytest -from kombu import Queue - - -from celery_pool_asyncio import TaskPool +import pathlib root = pathlib.Path() brocker = root / 'brocker' @@ -25,14 +15,15 @@ dir_processed = brocker / 'processed' dir_processed.mkdir(exist_ok=True) + async def tack_function(input_data): await asyncio.sleep(1) return input_data.upper() -@pytest.fixture(scope='session') -def celery_config(): - return { +def test_create_task(): + from celery.contrib.testing.app import TestApp + app = TestApp(config={ 'broker_url': 'filesystem:// %s' % dir_messages, 'broker_transport_options': { 'data_folder_in': '%s' % dir_out, @@ -40,30 +31,24 @@ def celery_config(): 'data_folder_processed': '%s' % dir_processed, }, 'result_persistent': True, - } - - -@pytest.fixture(scope='session') -def celery_includes(): - return [ - 'test_celery_pool_asyncio', - ] - - -@pytest.fixture(scope='session') -def celery_worker_pool(): - return 'celery_pool_asyncio:TaskPool' - - -def test_create_task(celery_app, celery_worker): - @celery_app.task - async def tack_function(input_data): - await asyncio.sleep(1) - print('tack_function', input_data) - return input_data.upper() - - task = tack_function.delay('hello, world!') - print('task', task) - result = task.get(timeout=10) - print('result', result) - + 'worker_pool': 'celery_pool_asyncio:TaskPool', + }) + wrapped = app.task(tack_function) + app.register_task(wrapped) + msg = 'hello, world!' + loop = asyncio.get_event_loop() + if False: + # + # This works. + # + task = wrapped(msg) + reply = loop.run_until_complete(task) + else: + # + # This times out. + # + task = wrapped.delay(msg) + reply = loop.run_until_complete(task) + reply = reply.get(timeout=10) + loop.close() + assert reply == msg.upper()