Skip to content
This repository has been archived by the owner on May 18, 2023. It is now read-only.

Celery redbeat scheduler compatability #20

Open
codesutras opened this issue May 26, 2020 · 12 comments
Open

Celery redbeat scheduler compatability #20

codesutras opened this issue May 26, 2020 · 12 comments

Comments

@codesutras
Copy link

codesutras commented May 26, 2020

  • Celery Executor version: 4.4.2
  • Python version: 3.8
  • Operating System: Ubuntu 18.0.4

Description

Getting below exception while starting celery beat with --scheduler celery_pool_asyncio:PersistentScheduler and it's not sending task to worker.

Celery beat should start normally and it should send task to workers.

What I Did

I executed below command and got exception.

celery beat -A my.app --scheduler celery_pool_asyncio:PersistentScheduler

erval -> 5.00 seconds (5s)
[2020-05-26 21:57:14,472: DEBUG/MainProcess] Setting default socket timeout to 30
[2020-05-26 21:57:14,473: INFO/MainProcess] beat: Starting...
[2020-05-26 21:57:14,498: DEBUG/MainProcess] Current schedule:

[2020-05-26 21:57:14,499: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
[2020-05-26 21:57:14,500: DEBUG/MainProcess] Using selector: EpollSelector
[2020-05-26 21:57:14,504: WARNING/MainProcess] Traceback (most recent call last):
[2020-05-26 21:57:14,504: WARNING/MainProcess] File "/home/jayant29/anaconda3/envs/trade_env/lib/python3.8/site-packages/celery_pool_asyncio/beat.py", line 23, in Service__async_start
    await self.async_run()
[2020-05-26 21:57:14,504: WARNING/MainProcess] File "/home/jayant29/anaconda3/envs/trade_env/lib/python3.8/site-packages/celery_pool_asyncio/beat.py", line 10, in Service__async_run
    interval = await self.scheduler.tick()
[2020-05-26 21:57:14,505: WARNING/MainProcess] TypeError: object int can't be used in 'await' expression
@kai3341
Copy link
Owner

kai3341 commented May 28, 2020

I can't reproduce this exception. Can you create minimal project with single void task, where this problem will be reproduced?
Are you sure you are running beat on python3.7? I see /lib/python3.8/site-packages/ in the path.
It's wierd thing, but somewhy self.scheduler is not celery_pool_asyncio:PersistentScheduler.
Try to rebuild your virtualenv -- maybe it's cheap solution.

@codesutras
Copy link
Author

I corrected python's version for this problem as 3.8 in my issue description. I'll share small project with a single task soon. Thanks for your quick reply.

@codesutras
Copy link
Author

@kai3341 I've rechecked my code and fond that celery_poop_asyncio is working as expected. and here is Working example of it.

However, What I have observed is that this code is not compatible to work with redbeat scheduler. For the obvious reasons, we should have the capabilities to edit, remove the task in runtime. As redbeat provides those capabilities.

By any chance, are you planning to work on such features, or we would need to get in touch with @sibson to update redbeat source code to make it compatible with the coroutine task?

Meanwhile, I just wanted to express my thanks for the great work you guys have done for the community.

@kai3341
Copy link
Owner

kai3341 commented May 31, 2020

@codesutras i think i'll solve this problem

@kai3341 kai3341 reopened this May 31, 2020
@kai3341 kai3341 changed the title Celery beat not sending task Celery redbeat scheduler compatability May 31, 2020
@codesutras
Copy link
Author

@codesutras i think i'll solve this problem

Perfect... Thanks a lot for taking it up.

@auvipy
Copy link

auvipy commented Jul 23, 2020

this will be a great thing!

@kai3341
Copy link
Owner

kai3341 commented Jul 23, 2020

@auvipy now I have some time for it -- i'm on vacation =)
But i'm looking (working on) to issue #22 and I understand how many things require an attention before this pool will be ready to merge into celery upstream =(

@auvipy
Copy link

auvipy commented Jul 23, 2020

ohho

@zikphil
Copy link

zikphil commented Sep 9, 2020

@kai3341 Can I offer a bounty for this?

@zikphil
Copy link

zikphil commented Sep 9, 2020

It might be a hack, but I got it to work dirty by creating this file and using this class as a scheduler:

from celery import beat
from redbeat import RedBeatScheduler
from redbeat.schedulers import get_redis, logger


class AsyncSchedulerMixin:
    async def apply_async(self, entry, producer=None, advance=True, **kwargs):
        # Update time-stamps and run counts before we actually execute,
        # so we have that done if an exception is raised (doesn't schedule
        # forever.)
        entry = self.reserve(entry) if advance else entry
        task = self.app.tasks.get(entry.task)

        try:
            entry_args = [
                v() if isinstance(v, beat.BeatLazyFunc) else v
                for v in (entry.args or [])
            ]
            entry_kwargs = {
                k: v() if isinstance(v, beat.BeatLazyFunc) else v
                for k, v in entry.kwargs.items()
            }

            if task:
                return await task.apply_async(
                    entry_args, entry_kwargs,
                    producer=producer,
                    **entry.options,
                )
            else:
                return await self.send_task(
                    entry.task, entry_args, entry_kwargs,
                    producer=producer,
                    **entry.options,
                )
        except Exception as exc:  # pylint: disable=broad-except
            msg = "Couldn't apply scheduled task {entry.name}: {exc}".format(
                entry=entry,
                exc=exc,
            )
            beat.reraise(
                beat.SchedulingError,
                beat.SchedulingError(msg),
                beat.sys.exc_info()[2]
            )
        finally:
            self._tasks_since_sync += 1
            if self.should_sync():
                self._do_sync()

    async def apply_entry(self, entry, producer=None):
        beat.info(
            'Scheduler: Sending due task %s (%s)',
            entry.name,
            entry.task,
        )

        try:
            coro = self.apply_async(
                entry=entry,
                producer=producer,
                advance=False,
            )
            result = await coro
        except Exception as exc:  # pylint: disable=broad-except
            beat.error(
                'Message Error: %s\n%s',
                exc,
                beat.traceback.format_stack(),
                exc_info=True,
            )
        else:
            beat.debug(
                '%s sent. id->%s',
                entry.task,
                result.id,
            )


class RedBeatAsyncScheduler(AsyncSchedulerMixin, RedBeatScheduler):

    async def maybe_due(self, entry, **kwargs):
        is_due, next_time_to_run = entry.is_due()

        if is_due:
            logger.info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
            try:
                result = await self.apply_async(entry, **kwargs)
            except Exception as exc:
                logger.exception('Message Error: %s', exc)
            else:
                logger.debug('%s sent. id->%s', entry.task, result.id)
        return next_time_to_run

    async def tick(self, min=min, **kwargs):
        if self.lock:
            logger.debug('beat: Extending lock...')
            get_redis(self.app).pexpire(self.lock_key, int(self.lock_timeout * 1000))

        remaining_times = []
        try:
            for entry in self.schedule.values():
                next_time_to_run = await self.maybe_due(entry, **self._maybe_due_kwargs)
                if next_time_to_run:
                    remaining_times.append(next_time_to_run)
        except RuntimeError:
            logger.debug('beat: RuntimeError', exc_info=True)

        return min(remaining_times + [self.max_interval])

@kai3341
Copy link
Owner

kai3341 commented Sep 12, 2020

@zikphil great job! Almost ok -- redis IO-bound operations most be wrapped to async/await via asgiref.sync_to_async
I hope i will be available next week

@kai3341
Copy link
Owner

kai3341 commented Sep 15, 2020

@zikphil @codesutras please check #28

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants