Skip to content

Commit

Permalink
Make process_message task use TaskWithRetry as base
Browse files Browse the repository at this point in the history
Raneme HandlerTaskWithRetry to TaskWithRetry and make
process_message task use it as base as well
so that the process_message tasks are retried as well.
Fixes #2323
  • Loading branch information
lbarcziova committed Feb 2, 2024
1 parent a4df126 commit 7616975
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 26 deletions.
45 changes: 23 additions & 22 deletions packit_service/worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def format(self, record):
log_package_versions(package_versions)


class HandlerTaskWithRetry(Task):
class TaskWithRetry(Task):
autoretry_for = (Exception,)
max_retries = int(getenv("CELERY_RETRY_LIMIT", DEFAULT_RETRY_LIMIT))
retry_kwargs = {"max_retries": max_retries}
Expand All @@ -140,7 +140,7 @@ class HandlerTaskWithRetry(Task):
acks_late = True


class BodhiHandlerTaskWithRetry(HandlerTaskWithRetry):
class BodhiTaskWithRetry(TaskWithRetry):
# hardcode for creating bodhi updates to account for the tagging race condition
max_retries = 5
# also disable jitter for the same reason
Expand All @@ -154,6 +154,7 @@ class BodhiHandlerTaskWithRetry(HandlerTaskWithRetry):
# set a lower time limit for process message as for other tasks
# https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.time_limit
time_limit=300,
base=TaskWithRetry,
)
def process_message(
self, event: dict, source: Optional[str] = None, event_type: Optional[str] = None
Expand Down Expand Up @@ -192,7 +193,7 @@ def babysit_copr_build(self, build_id: int):


# tasks for running the handlers
@celery_app.task(name=TaskName.copr_build_start, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.copr_build_start, base=TaskWithRetry)
def run_copr_build_start_handler(event: dict, package_config: dict, job_config: dict):
handler = CoprBuildStartHandler(
package_config=load_package_config(package_config),
Expand All @@ -202,7 +203,7 @@ def run_copr_build_start_handler(event: dict, package_config: dict, job_config:
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.copr_build_end, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.copr_build_end, base=TaskWithRetry)
def run_copr_build_end_handler(event: dict, package_config: dict, job_config: dict):
handler = CoprBuildEndHandler(
package_config=load_package_config(package_config),
Expand All @@ -213,7 +214,7 @@ def run_copr_build_end_handler(event: dict, package_config: dict, job_config: di


@celery_app.task(
bind=True, name=TaskName.copr_build, base=HandlerTaskWithRetry, queue="long-running"
bind=True, name=TaskName.copr_build, base=TaskWithRetry, queue="long-running"
)
def run_copr_build_handler(
self,
Expand All @@ -232,15 +233,15 @@ def run_copr_build_handler(
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.installation, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.installation, base=TaskWithRetry)
def run_installation_handler(event: dict, package_config: dict, job_config: dict):
handler = GithubAppInstallationHandler(
package_config=None, job_config=None, event=event
)
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.github_fas_verification, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.github_fas_verification, base=TaskWithRetry)
def run_github_fas_verification_handler(
event: dict, package_config: dict, job_config: dict
):
Expand All @@ -250,7 +251,7 @@ def run_github_fas_verification_handler(
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(bind=True, name=TaskName.testing_farm, base=HandlerTaskWithRetry)
@celery_app.task(bind=True, name=TaskName.testing_farm, base=TaskWithRetry)
def run_testing_farm_handler(
self,
event: dict,
Expand All @@ -270,7 +271,7 @@ def run_testing_farm_handler(
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.testing_farm_results, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.testing_farm_results, base=TaskWithRetry)
def run_testing_farm_results_handler(
event: dict, package_config: dict, job_config: dict
):
Expand All @@ -285,7 +286,7 @@ def run_testing_farm_results_handler(
@celery_app.task(
bind=True,
name=TaskName.propose_downstream,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="long-running",
)
def run_propose_downstream_handler(
Expand All @@ -308,7 +309,7 @@ def run_propose_downstream_handler(
@celery_app.task(
bind=True,
name=TaskName.pull_from_upstream,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="long-running",
)
def run_pull_from_upstream_handler(
Expand All @@ -329,7 +330,7 @@ def run_pull_from_upstream_handler(


@celery_app.task(
name=TaskName.upstream_koji_build, base=HandlerTaskWithRetry, queue="long-running"
name=TaskName.upstream_koji_build, base=TaskWithRetry, queue="long-running"
)
def run_koji_build_handler(event: dict, package_config: dict, job_config: dict):
handler = KojiBuildHandler(
Expand All @@ -340,7 +341,7 @@ def run_koji_build_handler(event: dict, package_config: dict, job_config: dict):
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.upstream_koji_build_report, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.upstream_koji_build_report, base=TaskWithRetry)
def run_koji_build_report_handler(event: dict, package_config: dict, job_config: dict):
handler = KojiTaskReportHandler(
package_config=load_package_config(package_config),
Expand All @@ -351,7 +352,7 @@ def run_koji_build_report_handler(event: dict, package_config: dict, job_config:


@celery_app.task(
name=TaskName.sync_from_downstream, base=HandlerTaskWithRetry, queue="long-running"
name=TaskName.sync_from_downstream, base=TaskWithRetry, queue="long-running"
)
def run_sync_from_downstream_handler(
event: dict, package_config: dict, job_config: dict
Expand All @@ -367,7 +368,7 @@ def run_sync_from_downstream_handler(
@celery_app.task(
bind=True,
name=TaskName.downstream_koji_build,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="long-running",
)
def run_downstream_koji_build(
Expand All @@ -390,7 +391,7 @@ def run_downstream_koji_build(
@celery_app.task(
bind=True,
name=TaskName.retrigger_downstream_koji_build,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="long-running",
)
def run_retrigger_downstream_koji_build(
Expand All @@ -410,7 +411,7 @@ def run_retrigger_downstream_koji_build(
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.downstream_koji_build_report, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.downstream_koji_build_report, base=TaskWithRetry)
def run_downstream_koji_build_report(
event: dict, package_config: dict, job_config: dict
):
Expand All @@ -425,7 +426,7 @@ def run_downstream_koji_build_report(
@celery_app.task(
bind=True,
name=TaskName.bodhi_update,
base=BodhiHandlerTaskWithRetry,
base=BodhiTaskWithRetry,
queue="long-running",
)
def run_bodhi_update(
Expand All @@ -448,7 +449,7 @@ def run_bodhi_update(
@celery_app.task(
bind=True,
name=TaskName.retrigger_bodhi_update,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="long-running",
)
def run_retrigger_bodhi_update(
Expand All @@ -471,7 +472,7 @@ def run_retrigger_bodhi_update(
@celery_app.task(
bind=True,
name=TaskName.issue_comment_retrigger_bodhi_update,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="long-running",
)
def run_issue_comment_retrigger_bodhi_update(
Expand All @@ -494,7 +495,7 @@ def run_issue_comment_retrigger_bodhi_update(
@celery_app.task(
bind=True,
name=TaskName.vm_image_build,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="short-running",
)
def run_vm_image_build(self, event: dict, package_config: dict, job_config: dict):
Expand All @@ -507,7 +508,7 @@ def run_vm_image_build(self, event: dict, package_config: dict, job_config: dict
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.vm_image_build_result, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.vm_image_build_result, base=TaskWithRetry)
def run_vm_image_build_result(
self, event: dict, package_config: dict, job_config: dict
):
Expand Down
6 changes: 2 additions & 4 deletions tests/integration/test_bodhi_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from packit_service.worker.monitoring import Pushgateway
from packit_service.worker.tasks import (
run_bodhi_update,
BodhiHandlerTaskWithRetry,
BodhiTaskWithRetry,
)
from tests.spellbook import first_dict_value, get_parameters_from_results

Expand Down Expand Up @@ -427,9 +427,7 @@ def test_bodhi_update_for_unknown_koji_build_failed_issue_comment(
event=event_dict,
# Needs to be the last try to inform user
celery_task=flexmock(
request=flexmock(
retries=BodhiHandlerTaskWithRetry.retry_kwargs["max_retries"]
),
request=flexmock(retries=BodhiTaskWithRetry.retry_kwargs["max_retries"]),
max_retries=DEFAULT_RETRY_LIMIT,
),
).run_job()
Expand Down

0 comments on commit 7616975

Please sign in to comment.