From a4df1266b75536f3d0645cf4d701139c0b75bfb5 Mon Sep 17 00:00:00 2001 From: Laura Barcziova Date: Wed, 31 Jan 2024 12:11:18 +0100 Subject: [PATCH 1/2] Set lower hard time limit for process message This task should be exetuced much faster, it doesn't execute any user defined actions etc., therefore set the time limit lower than the global one (900s). Related to #2290 --- packit_service/worker/tasks.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packit_service/worker/tasks.py b/packit_service/worker/tasks.py index 8ad2c815c..1f7114353 100644 --- a/packit_service/worker/tasks.py +++ b/packit_service/worker/tasks.py @@ -149,7 +149,11 @@ class BodhiHandlerTaskWithRetry(HandlerTaskWithRetry): @celery_app.task( - name=getenv("CELERY_MAIN_TASK_NAME") or CELERY_DEFAULT_MAIN_TASK_NAME, bind=True + name=getenv("CELERY_MAIN_TASK_NAME") or CELERY_DEFAULT_MAIN_TASK_NAME, + bind=True, + # set a lower time limit for process message as for other tasks + # https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.time_limit + time_limit=300, ) def process_message( self, event: dict, source: Optional[str] = None, event_type: Optional[str] = None From 76169753b4bfdfe9542bcaeb88ed6df8c476c1e6 Mon Sep 17 00:00:00 2001 From: Laura Barcziova Date: Wed, 31 Jan 2024 12:13:19 +0100 Subject: [PATCH 2/2] Make process_message task use TaskWithRetry as base Raneme HandlerTaskWithRetry to TaskWithRetry and make process_message task use it as base as well so that the process_message tasks are retried as well. Fixes #2323 --- packit_service/worker/tasks.py | 45 +++++++++++++------------- tests/integration/test_bodhi_update.py | 6 ++-- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/packit_service/worker/tasks.py b/packit_service/worker/tasks.py index 1f7114353..e378e3035 100644 --- a/packit_service/worker/tasks.py +++ b/packit_service/worker/tasks.py @@ -130,7 +130,7 @@ def format(self, record): log_package_versions(package_versions) -class HandlerTaskWithRetry(Task): +class TaskWithRetry(Task): autoretry_for = (Exception,) max_retries = int(getenv("CELERY_RETRY_LIMIT", DEFAULT_RETRY_LIMIT)) retry_kwargs = {"max_retries": max_retries} @@ -140,7 +140,7 @@ class HandlerTaskWithRetry(Task): acks_late = True -class BodhiHandlerTaskWithRetry(HandlerTaskWithRetry): +class BodhiTaskWithRetry(TaskWithRetry): # hardcode for creating bodhi updates to account for the tagging race condition max_retries = 5 # also disable jitter for the same reason @@ -154,6 +154,7 @@ class BodhiHandlerTaskWithRetry(HandlerTaskWithRetry): # set a lower time limit for process message as for other tasks # https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.time_limit time_limit=300, + base=TaskWithRetry, ) def process_message( self, event: dict, source: Optional[str] = None, event_type: Optional[str] = None @@ -192,7 +193,7 @@ def babysit_copr_build(self, build_id: int): # tasks for running the handlers -@celery_app.task(name=TaskName.copr_build_start, base=HandlerTaskWithRetry) +@celery_app.task(name=TaskName.copr_build_start, base=TaskWithRetry) def run_copr_build_start_handler(event: dict, package_config: dict, job_config: dict): handler = CoprBuildStartHandler( package_config=load_package_config(package_config), @@ -202,7 +203,7 @@ def run_copr_build_start_handler(event: dict, package_config: dict, job_config: return get_handlers_task_results(handler.run_job(), event) -@celery_app.task(name=TaskName.copr_build_end, base=HandlerTaskWithRetry) +@celery_app.task(name=TaskName.copr_build_end, base=TaskWithRetry) def run_copr_build_end_handler(event: dict, package_config: dict, job_config: dict): handler = CoprBuildEndHandler( package_config=load_package_config(package_config), @@ -213,7 +214,7 @@ def run_copr_build_end_handler(event: dict, package_config: dict, job_config: di @celery_app.task( - bind=True, name=TaskName.copr_build, base=HandlerTaskWithRetry, queue="long-running" + bind=True, name=TaskName.copr_build, base=TaskWithRetry, queue="long-running" ) def run_copr_build_handler( self, @@ -232,7 +233,7 @@ def run_copr_build_handler( return get_handlers_task_results(handler.run_job(), event) -@celery_app.task(name=TaskName.installation, base=HandlerTaskWithRetry) +@celery_app.task(name=TaskName.installation, base=TaskWithRetry) def run_installation_handler(event: dict, package_config: dict, job_config: dict): handler = GithubAppInstallationHandler( package_config=None, job_config=None, event=event @@ -240,7 +241,7 @@ def run_installation_handler(event: dict, package_config: dict, job_config: dict return get_handlers_task_results(handler.run_job(), event) -@celery_app.task(name=TaskName.github_fas_verification, base=HandlerTaskWithRetry) +@celery_app.task(name=TaskName.github_fas_verification, base=TaskWithRetry) def run_github_fas_verification_handler( event: dict, package_config: dict, job_config: dict ): @@ -250,7 +251,7 @@ def run_github_fas_verification_handler( return get_handlers_task_results(handler.run_job(), event) -@celery_app.task(bind=True, name=TaskName.testing_farm, base=HandlerTaskWithRetry) +@celery_app.task(bind=True, name=TaskName.testing_farm, base=TaskWithRetry) def run_testing_farm_handler( self, event: dict, @@ -270,7 +271,7 @@ def run_testing_farm_handler( return get_handlers_task_results(handler.run_job(), event) -@celery_app.task(name=TaskName.testing_farm_results, base=HandlerTaskWithRetry) +@celery_app.task(name=TaskName.testing_farm_results, base=TaskWithRetry) def run_testing_farm_results_handler( event: dict, package_config: dict, job_config: dict ): @@ -285,7 +286,7 @@ def run_testing_farm_results_handler( @celery_app.task( bind=True, name=TaskName.propose_downstream, - base=HandlerTaskWithRetry, + base=TaskWithRetry, queue="long-running", ) def run_propose_downstream_handler( @@ -308,7 +309,7 @@ def run_propose_downstream_handler( @celery_app.task( bind=True, name=TaskName.pull_from_upstream, - base=HandlerTaskWithRetry, + base=TaskWithRetry, queue="long-running", ) def run_pull_from_upstream_handler( @@ -329,7 +330,7 @@ def run_pull_from_upstream_handler( @celery_app.task( - name=TaskName.upstream_koji_build, base=HandlerTaskWithRetry, queue="long-running" + name=TaskName.upstream_koji_build, base=TaskWithRetry, queue="long-running" ) def run_koji_build_handler(event: dict, package_config: dict, job_config: dict): handler = KojiBuildHandler( @@ -340,7 +341,7 @@ def run_koji_build_handler(event: dict, package_config: dict, job_config: dict): return get_handlers_task_results(handler.run_job(), event) -@celery_app.task(name=TaskName.upstream_koji_build_report, base=HandlerTaskWithRetry) +@celery_app.task(name=TaskName.upstream_koji_build_report, base=TaskWithRetry) def run_koji_build_report_handler(event: dict, package_config: dict, job_config: dict): handler = KojiTaskReportHandler( package_config=load_package_config(package_config), @@ -351,7 +352,7 @@ def run_koji_build_report_handler(event: dict, package_config: dict, job_config: @celery_app.task( - name=TaskName.sync_from_downstream, base=HandlerTaskWithRetry, queue="long-running" + name=TaskName.sync_from_downstream, base=TaskWithRetry, queue="long-running" ) def run_sync_from_downstream_handler( event: dict, package_config: dict, job_config: dict @@ -367,7 +368,7 @@ def run_sync_from_downstream_handler( @celery_app.task( bind=True, name=TaskName.downstream_koji_build, - base=HandlerTaskWithRetry, + base=TaskWithRetry, queue="long-running", ) def run_downstream_koji_build( @@ -390,7 +391,7 @@ def run_downstream_koji_build( @celery_app.task( bind=True, name=TaskName.retrigger_downstream_koji_build, - base=HandlerTaskWithRetry, + base=TaskWithRetry, queue="long-running", ) def run_retrigger_downstream_koji_build( @@ -410,7 +411,7 @@ def run_retrigger_downstream_koji_build( return get_handlers_task_results(handler.run_job(), event) -@celery_app.task(name=TaskName.downstream_koji_build_report, base=HandlerTaskWithRetry) +@celery_app.task(name=TaskName.downstream_koji_build_report, base=TaskWithRetry) def run_downstream_koji_build_report( event: dict, package_config: dict, job_config: dict ): @@ -425,7 +426,7 @@ def run_downstream_koji_build_report( @celery_app.task( bind=True, name=TaskName.bodhi_update, - base=BodhiHandlerTaskWithRetry, + base=BodhiTaskWithRetry, queue="long-running", ) def run_bodhi_update( @@ -448,7 +449,7 @@ def run_bodhi_update( @celery_app.task( bind=True, name=TaskName.retrigger_bodhi_update, - base=HandlerTaskWithRetry, + base=TaskWithRetry, queue="long-running", ) def run_retrigger_bodhi_update( @@ -471,7 +472,7 @@ def run_retrigger_bodhi_update( @celery_app.task( bind=True, name=TaskName.issue_comment_retrigger_bodhi_update, - base=HandlerTaskWithRetry, + base=TaskWithRetry, queue="long-running", ) def run_issue_comment_retrigger_bodhi_update( @@ -494,7 +495,7 @@ def run_issue_comment_retrigger_bodhi_update( @celery_app.task( bind=True, name=TaskName.vm_image_build, - base=HandlerTaskWithRetry, + base=TaskWithRetry, queue="short-running", ) def run_vm_image_build(self, event: dict, package_config: dict, job_config: dict): @@ -507,7 +508,7 @@ def run_vm_image_build(self, event: dict, package_config: dict, job_config: dict return get_handlers_task_results(handler.run_job(), event) -@celery_app.task(name=TaskName.vm_image_build_result, base=HandlerTaskWithRetry) +@celery_app.task(name=TaskName.vm_image_build_result, base=TaskWithRetry) def run_vm_image_build_result( self, event: dict, package_config: dict, job_config: dict ): diff --git a/tests/integration/test_bodhi_update.py b/tests/integration/test_bodhi_update.py index 919daea25..0ca02545e 100644 --- a/tests/integration/test_bodhi_update.py +++ b/tests/integration/test_bodhi_update.py @@ -31,7 +31,7 @@ from packit_service.worker.monitoring import Pushgateway from packit_service.worker.tasks import ( run_bodhi_update, - BodhiHandlerTaskWithRetry, + BodhiTaskWithRetry, ) from tests.spellbook import first_dict_value, get_parameters_from_results @@ -427,9 +427,7 @@ def test_bodhi_update_for_unknown_koji_build_failed_issue_comment( event=event_dict, # Needs to be the last try to inform user celery_task=flexmock( - request=flexmock( - retries=BodhiHandlerTaskWithRetry.retry_kwargs["max_retries"] - ), + request=flexmock(retries=BodhiTaskWithRetry.retry_kwargs["max_retries"]), max_retries=DEFAULT_RETRY_LIMIT, ), ).run_job()