Skip to content
This repository has been archived by the owner on May 18, 2023. It is now read-only.

Feature: support celery5 and python38 #32

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions celery_pool_asyncio/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
from ..environment_variables import monkey_available


if monkey_available('AMQP_BACKEND'):
from . import amqp # noqa
amqp.__package__

if monkey_available('RPC_BACKEND'):
from . import rpc # noqa
rpc.__package__
72 changes: 0 additions & 72 deletions celery_pool_asyncio/backends/amqp.py

This file was deleted.

3 changes: 2 additions & 1 deletion celery_pool_asyncio/tracer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import logging
import asyncio
import time


from celery import canvas
Expand Down Expand Up @@ -59,7 +60,7 @@ def build_async_tracer(
eager=False,
propagate=False,
app=None,
monotonic=trace.monotonic,
monotonic=time.monotonic,
trace_ok_t=trace.trace_ok_t,
IGNORE_STATES=trace.IGNORE_STATES,
):
Expand Down
33 changes: 16 additions & 17 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
celery==4.2.1
celery

setuptools>=38.6.0
pip==18.0
bumpversion==0.5.3
wheel>=0.31
watchdog==0.8.3
flake8~=3.5
tox>=2.9
coverage>=4.5
Sphinx==1.7.6
twine>=1.11.0
future>=0.16.0
futures>=3.2; python_version <= "2.7"
dill>=0.2.7; python_version <= "2.7"
setuptools
pip
bumpversion
wheel
watchdog
flake8
tox
coverage
Sphinx
twine
future

pytest>=3.6
pytest-cov>=2.5.1
pytest-runner>=4.2
pytest
pytest-celery
pytest-cov
pytest-runner
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
packages=find_packages(),
setup_requires=setup_requirements,
test_suite='tests',
#tests_require=test_requirements,
url='https://github.com/kai3341/celery-pool-asyncio',
version='0.2.0',
zip_safe=True,
Expand Down
67 changes: 26 additions & 41 deletions tests/test_celery_pool_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
#!/usr/bin/env python

import pathlib
import asyncio
import time
from pprint import pformat
#from concurrent.futures import TimeoutError

import pytest
from kombu import Queue


from celery_pool_asyncio import TaskPool
import pathlib

root = pathlib.Path()
brocker = root / 'brocker'
Expand All @@ -25,45 +15,40 @@
dir_processed = brocker / 'processed'
dir_processed.mkdir(exist_ok=True)


async def tack_function(input_data):
await asyncio.sleep(1)
return input_data.upper()


@pytest.fixture(scope='session')
def celery_config():
return {
def test_create_task():
from celery.contrib.testing.app import TestApp
app = TestApp(config={
'broker_url': 'filesystem:// %s' % dir_messages,
'broker_transport_options': {
'data_folder_in': '%s' % dir_out,
'data_folder_out': '%s' % dir_out,
'data_folder_processed': '%s' % dir_processed,
},
'result_persistent': True,
}


@pytest.fixture(scope='session')
def celery_includes():
return [
'tests.test_celery_pool_asyncio',
]


@pytest.fixture(scope='session')
def celery_worker_pool():
return 'celery_pool_asyncio:TaskPool'


def test_create_task(celery_app, celery_worker):
@celery_app.task
async def tack_function(input_data):
await asyncio.sleep(1)
print('tack_function', input_data)
return input_data.upper()

task = tack_function.delay('hello, world!')
print('task', task)
result = task.get(timeout=10)
print('result', result)

'worker_pool': 'celery_pool_asyncio:TaskPool',
})
wrapped = app.task(tack_function)
app.register_task(wrapped)
msg = 'hello, world!'
loop = asyncio.get_event_loop()
if False:
#
# This works.
#
task = wrapped(msg)
reply = loop.run_until_complete(task)
else:
#
# This times out.
#
task = wrapped.delay(msg)
reply = loop.run_until_complete(task)
reply = reply.get(timeout=10)
loop.close()
assert reply == msg.upper()
5 changes: 3 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
[tox]
envlist = py27, py35, py36, flake8
envlist = py38, flake8

[travis]
python =
3.8: py38
3.6: py36
3.5: py35
2.7: py27

[testenv:flake8]
basepython = python
deps = flake8
commands = flake8 celery_executor
commands = flake8 celery_pool_asyncio

[testenv]
setenv =
Expand Down