Skip to content
This repository has been archived by the owner on May 18, 2023. It is now read-only.

Commit

Permalink
Get the pool configured, and the worker started. The key issue is
Browse files Browse the repository at this point in the history
the interaction between .delay() and loop.run_until_complete(task).
  • Loading branch information
ShaheedHaque committed Aug 30, 2021
1 parent 149696f commit 471aedf
Showing 1 changed file with 26 additions and 41 deletions.
67 changes: 26 additions & 41 deletions tests/test_celery_pool_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
#!/usr/bin/env python

import pathlib
import asyncio
import time
from pprint import pformat
#from concurrent.futures import TimeoutError

import pytest
from kombu import Queue


from celery_pool_asyncio import TaskPool
import pathlib

root = pathlib.Path()
brocker = root / 'brocker'
Expand All @@ -25,45 +15,40 @@
dir_processed = brocker / 'processed'
dir_processed.mkdir(exist_ok=True)


async def tack_function(input_data):
await asyncio.sleep(1)
return input_data.upper()


@pytest.fixture(scope='session')
def celery_config():
return {
def test_create_task():
from celery.contrib.testing.app import TestApp
app = TestApp(config={
'broker_url': 'filesystem:// %s' % dir_messages,
'broker_transport_options': {
'data_folder_in': '%s' % dir_out,
'data_folder_out': '%s' % dir_out,
'data_folder_processed': '%s' % dir_processed,
},
'result_persistent': True,
}


@pytest.fixture(scope='session')
def celery_includes():
return [
'test_celery_pool_asyncio',
]


@pytest.fixture(scope='session')
def celery_worker_pool():
return 'celery_pool_asyncio:TaskPool'


def test_create_task(celery_app, celery_worker):
@celery_app.task
async def tack_function(input_data):
await asyncio.sleep(1)
print('tack_function', input_data)
return input_data.upper()

task = tack_function.delay('hello, world!')
print('task', task)
result = task.get(timeout=10)
print('result', result)

'worker_pool': 'celery_pool_asyncio:TaskPool',
})
wrapped = app.task(tack_function)
app.register_task(wrapped)
msg = 'hello, world!'
loop = asyncio.get_event_loop()
if False:
#
# This works.
#
task = wrapped(msg)
reply = loop.run_until_complete(task)
else:
#
# This times out.
#
task = wrapped.delay(msg)
reply = loop.run_until_complete(task)
reply = reply.get(timeout=10)
loop.close()
assert reply == msg.upper()

0 comments on commit 471aedf

Please sign in to comment.