Skip to content
This repository has been archived by the owner on May 18, 2023. It is now read-only.

Feature: support celery5 and python38 #32

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

ShaheedHaque
Copy link

@ShaheedHaque ShaheedHaque commented Apr 10, 2021

This is NOT ready to be merged.

The eventual goal of this PR is to address #17, #23 and #29. At this time, the bare minimum changes have been applied to be able to launch tox, and have it hang like so:

$ tox -r
GLOB sdist-make: /main/srhaque/kdedev/celery-pool-asyncio/setup.py
py38 recreate: /main/srhaque/kdedev/celery-pool-asyncio/.tox/py38
py38 installdeps: -r/main/srhaque/kdedev/celery-pool-asyncio/requirements_dev.txt
py38 inst: /main/srhaque/kdedev/celery-pool-asyncio/.tox/.tmp/package/1/celery-pool-asyncio-0.2.0.zip
py38 installed: alabaster==0.7.12,...,zipp==3.4.1
py38 run-test-pre: PYTHONHASHSEED='3791355490'
py38 run-test: commands[0] | pip install -U pip
Requirement already satisfied: pip in ./.tox/py38/lib/python3.8/site-packages (20.3.3)
Collecting pip
  Using cached pip-21.0.1-py3-none-any.whl (1.5 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 20.3.3
    Uninstalling pip-20.3.3:
      Successfully uninstalled pip-20.3.3
Successfully installed pip-21.0.1
py38 run-test: commands[1] | py.test --basetemp=/main/srhaque/kdedev/celery-pool-asyncio/.tox/py38/tmp
================================================================================ test session starts =================================================================================
platform linux -- Python 3.8.5, pytest-6.2.3, py-1.10.0, pluggy-0.13.1 -- /main/srhaque/kdedev/celery-pool-asyncio/.tox/py38/bin/python
cachedir: .tox/py38/.pytest_cache
rootdir: /main/srhaque/kdedev/celery-pool-asyncio, configfile: setup.cfg
plugins: cov-2.11.1
collected 1 item                                                                                                                                                                     

tests/test_celery_pool_asyncio.py::test_create_task ^CERROR: got KeyboardInterrupt signal

The proximate reason for the hang is that https://github.com/celery/celery/blob/4f2213a427861cf42b778ef499f29b179d8c40ed/celery/contrib/testing/worker.py#L46 says:

        Warning:
            Worker must be started within a thread for this to work,
            or it will block forever.

and though the test does do this, somehow the worker thread never starts.

@ShaheedHaque ShaheedHaque force-pushed the feature_celery5_python38 branch from 3b39bd8 to 149696f Compare April 10, 2021 13:26
@ShaheedHaque ShaheedHaque force-pushed the feature_celery5_python38 branch from 2db71b5 to 6daaf8b Compare August 12, 2021 18:28
the interaction between .delay() and loop.run_until_complete(task).
@ShaheedHaque ShaheedHaque force-pushed the feature_celery5_python38 branch from 6daaf8b to 471aedf Compare August 30, 2021 10:07
@ShaheedHaque
Copy link
Author

Some small progress. Now, the test fails as follows:

$ tox
...
py38 run-test: commands[1] | py.test --basetemp=/main/srhaque/kdedev/celery-pool-asyncio/.tox/py38/tmp
========================================================== test session starts ===========================================================
platform linux -- Python 3.8.10, pytest-6.2.4, py-1.10.0, pluggy-0.13.1 -- /main/srhaque/kdedev/celery-pool-asyncio/.tox/py38/bin/python
cachedir: .tox/py38/.pytest_cache
rootdir: /main/srhaque/kdedev/celery-pool-asyncio, configfile: setup.cfg
plugins: celery-0.0.0, cov-2.12.1
collected 1 item                                                                                                                         

tests/test_celery_pool_asyncio.py::test_create_task FAILED

================================================================ FAILURES ================================================================
____________________________________________________________ test_create_task ____________________________________________________________

    def test_create_task():
        from celery.contrib.testing.app import TestApp
        app = TestApp(config={
            'broker_url': 'filesystem:// %s' % dir_messages,
            'broker_transport_options': {
                'data_folder_in': '%s' % dir_out,
                'data_folder_out': '%s' % dir_out,
                'data_folder_processed': '%s' % dir_processed,
            },
            'result_persistent': True,
            'worker_pool': 'celery_pool_asyncio:TaskPool',
        })
        wrapped = app.task(tack_function)
        app.register_task(wrapped)
        msg = 'hello, world!'
        loop = asyncio.get_event_loop()
        if False:
            #
            # This works.
            #
            task = wrapped(msg)
            reply = loop.run_until_complete(task)
        else:
            #
            # This times out.
            #
            task = wrapped.delay(msg)
            reply = loop.run_until_complete(task)
>           reply = reply.get(timeout=10)

tests/test_celery_pool_asyncio.py:62: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.tox/py38/lib/python3.8/site-packages/celery/result.py:223: in get
    return self.backend.wait_for_pending(
.tox/py38/lib/python3.8/site-packages/celery/backends/base.py:703: in wait_for_pending
    meta = self.wait_for(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <celery.backends.cache.CacheBackend object at 0x7f0c217eff40>, task_id = '1dc86723-d462-4306-aea0-ab78cd9082e0', timeout = 10
interval = 0.5, no_ack = True, on_interval = <promise@0x7f0c22073b80>

    def wait_for(self, task_id,
                 timeout=None, interval=0.5, no_ack=True, on_interval=None):
        """Wait for task and return its result.
    
        If the task raises an exception, this exception
        will be re-raised by :func:`wait_for`.
    
        Raises:
            celery.exceptions.TimeoutError:
                If `timeout` is not :const:`None`, and the operation
                takes longer than `timeout` seconds.
        """
        self._ensure_not_eager()
    
        time_elapsed = 0.0
    
        while 1:
            meta = self.get_task_meta(task_id)
            if meta['status'] in states.READY_STATES:
                return meta
            if on_interval:
                on_interval()
            # avoid hammering the CPU checking status.
            time.sleep(interval)
            time_elapsed += interval
            if timeout and time_elapsed >= timeout:
>               raise TimeoutError('The operation timed out.')
E               celery.exceptions.TimeoutError: The operation timed out.

.tox/py38/lib/python3.8/site-packages/celery/backends/base.py:739: TimeoutError

As per the commit comment, the key issue is between the working and non-working paths in the test is the interaction between the call to .delay() and loop.run_until_complete(task). I've not yet been able to understand how .delay() or .apply_async() actually work, not least in the test environment without an actual broker: that's probably the key to this.

@the-wondersmith
Copy link

@ShaheedHaque @kai3341 I figured out what the issue with the test(s) is. I've got a fork of @ShaheedHaque's work and have been working off of that.

The short version is that the issue is coming from the way that the test suite is (not) using the celery_session_app and celery_session_worker fixtures respectively. At the time of this writing (~7:40 PM EST 10/23/2022), the I've got it all working such that the test suite does appear to at least be calling the dummy task_function in response to task.delay(message) being called. However it currently hangs there indefinitely (as opposed to raising a timeout error like it previously was).

I'm 90% sure it's hanging because of the way that the Celery fixtures use threads along with the fact that I've added breakpoints to various functions to help be debug the issue. The short version is that I don't think that I'm to far off from having everything working properly, but I'll keep this thread updated and open a PR when I've got it all 100%.

@ShaheedHaque
Copy link
Author

Great progress, sounds promising! Let me know if I can help.

@the-wondersmith
Copy link

@ShaheedHaque Hoo boy. Ok, so, I think I've got a handle on what the actual issue is but I'm... very not sure what to do about it. Any chance you have Discord or anything more real-time than this thread? I'd love to go over it with you, and I'd bet we could get it knocked out in an afternoon.

@ShaheedHaque
Copy link
Author

ShaheedHaque commented Oct 24, 2022

I've not looked at the code in some months, and will be rather stale as a result. That said, I'd be happy to do (say) a Skype session so we can share screens etc. I'm in London, and hence UK timezone. Send me an email at <username>@gmail.com and we can arrange the details.

@the-wondersmith
Copy link

@ShaheedHaque I sent off an email this morning, should show up as the@<username>.dev. Looking forward to getting this working 😁

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants