Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process message task improvements #2326

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 28 additions & 23 deletions packit_service/worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def format(self, record):
log_package_versions(package_versions)


class HandlerTaskWithRetry(Task):
class TaskWithRetry(Task):
autoretry_for = (Exception,)
max_retries = int(getenv("CELERY_RETRY_LIMIT", DEFAULT_RETRY_LIMIT))
retry_kwargs = {"max_retries": max_retries}
Expand All @@ -140,7 +140,7 @@ class HandlerTaskWithRetry(Task):
acks_late = True


class BodhiHandlerTaskWithRetry(HandlerTaskWithRetry):
class BodhiTaskWithRetry(TaskWithRetry):
# hardcode for creating bodhi updates to account for the tagging race condition
max_retries = 5
# also disable jitter for the same reason
Expand All @@ -149,7 +149,12 @@ class BodhiHandlerTaskWithRetry(HandlerTaskWithRetry):


@celery_app.task(
name=getenv("CELERY_MAIN_TASK_NAME") or CELERY_DEFAULT_MAIN_TASK_NAME, bind=True
name=getenv("CELERY_MAIN_TASK_NAME") or CELERY_DEFAULT_MAIN_TASK_NAME,
bind=True,
# set a lower time limit for process message as for other tasks
# https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.time_limit
time_limit=300,
base=TaskWithRetry,
)
def process_message(
self, event: dict, source: Optional[str] = None, event_type: Optional[str] = None
Expand Down Expand Up @@ -188,7 +193,7 @@ def babysit_copr_build(self, build_id: int):


# tasks for running the handlers
@celery_app.task(name=TaskName.copr_build_start, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.copr_build_start, base=TaskWithRetry)
def run_copr_build_start_handler(event: dict, package_config: dict, job_config: dict):
handler = CoprBuildStartHandler(
package_config=load_package_config(package_config),
Expand All @@ -198,7 +203,7 @@ def run_copr_build_start_handler(event: dict, package_config: dict, job_config:
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.copr_build_end, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.copr_build_end, base=TaskWithRetry)
def run_copr_build_end_handler(event: dict, package_config: dict, job_config: dict):
handler = CoprBuildEndHandler(
package_config=load_package_config(package_config),
Expand All @@ -209,7 +214,7 @@ def run_copr_build_end_handler(event: dict, package_config: dict, job_config: di


@celery_app.task(
bind=True, name=TaskName.copr_build, base=HandlerTaskWithRetry, queue="long-running"
bind=True, name=TaskName.copr_build, base=TaskWithRetry, queue="long-running"
)
def run_copr_build_handler(
self,
Expand All @@ -228,15 +233,15 @@ def run_copr_build_handler(
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.installation, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.installation, base=TaskWithRetry)
def run_installation_handler(event: dict, package_config: dict, job_config: dict):
handler = GithubAppInstallationHandler(
package_config=None, job_config=None, event=event
)
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.github_fas_verification, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.github_fas_verification, base=TaskWithRetry)
def run_github_fas_verification_handler(
event: dict, package_config: dict, job_config: dict
):
Expand All @@ -246,7 +251,7 @@ def run_github_fas_verification_handler(
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(bind=True, name=TaskName.testing_farm, base=HandlerTaskWithRetry)
@celery_app.task(bind=True, name=TaskName.testing_farm, base=TaskWithRetry)
def run_testing_farm_handler(
self,
event: dict,
Expand All @@ -266,7 +271,7 @@ def run_testing_farm_handler(
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.testing_farm_results, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.testing_farm_results, base=TaskWithRetry)
def run_testing_farm_results_handler(
event: dict, package_config: dict, job_config: dict
):
Expand All @@ -281,7 +286,7 @@ def run_testing_farm_results_handler(
@celery_app.task(
bind=True,
name=TaskName.propose_downstream,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="long-running",
)
def run_propose_downstream_handler(
Expand All @@ -304,7 +309,7 @@ def run_propose_downstream_handler(
@celery_app.task(
bind=True,
name=TaskName.pull_from_upstream,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="long-running",
)
def run_pull_from_upstream_handler(
Expand All @@ -325,7 +330,7 @@ def run_pull_from_upstream_handler(


@celery_app.task(
name=TaskName.upstream_koji_build, base=HandlerTaskWithRetry, queue="long-running"
name=TaskName.upstream_koji_build, base=TaskWithRetry, queue="long-running"
)
def run_koji_build_handler(event: dict, package_config: dict, job_config: dict):
handler = KojiBuildHandler(
Expand All @@ -336,7 +341,7 @@ def run_koji_build_handler(event: dict, package_config: dict, job_config: dict):
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.upstream_koji_build_report, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.upstream_koji_build_report, base=TaskWithRetry)
def run_koji_build_report_handler(event: dict, package_config: dict, job_config: dict):
handler = KojiTaskReportHandler(
package_config=load_package_config(package_config),
Expand All @@ -347,7 +352,7 @@ def run_koji_build_report_handler(event: dict, package_config: dict, job_config:


@celery_app.task(
name=TaskName.sync_from_downstream, base=HandlerTaskWithRetry, queue="long-running"
name=TaskName.sync_from_downstream, base=TaskWithRetry, queue="long-running"
)
def run_sync_from_downstream_handler(
event: dict, package_config: dict, job_config: dict
Expand All @@ -363,7 +368,7 @@ def run_sync_from_downstream_handler(
@celery_app.task(
bind=True,
name=TaskName.downstream_koji_build,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="long-running",
)
def run_downstream_koji_build(
Expand All @@ -386,7 +391,7 @@ def run_downstream_koji_build(
@celery_app.task(
bind=True,
name=TaskName.retrigger_downstream_koji_build,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="long-running",
)
def run_retrigger_downstream_koji_build(
Expand All @@ -406,7 +411,7 @@ def run_retrigger_downstream_koji_build(
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.downstream_koji_build_report, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.downstream_koji_build_report, base=TaskWithRetry)
def run_downstream_koji_build_report(
event: dict, package_config: dict, job_config: dict
):
Expand All @@ -421,7 +426,7 @@ def run_downstream_koji_build_report(
@celery_app.task(
bind=True,
name=TaskName.bodhi_update,
base=BodhiHandlerTaskWithRetry,
base=BodhiTaskWithRetry,
queue="long-running",
)
def run_bodhi_update(
Expand All @@ -444,7 +449,7 @@ def run_bodhi_update(
@celery_app.task(
bind=True,
name=TaskName.retrigger_bodhi_update,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="long-running",
)
def run_retrigger_bodhi_update(
Expand All @@ -467,7 +472,7 @@ def run_retrigger_bodhi_update(
@celery_app.task(
bind=True,
name=TaskName.issue_comment_retrigger_bodhi_update,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="long-running",
)
def run_issue_comment_retrigger_bodhi_update(
Expand All @@ -490,7 +495,7 @@ def run_issue_comment_retrigger_bodhi_update(
@celery_app.task(
bind=True,
name=TaskName.vm_image_build,
base=HandlerTaskWithRetry,
base=TaskWithRetry,
queue="short-running",
)
def run_vm_image_build(self, event: dict, package_config: dict, job_config: dict):
Expand All @@ -503,7 +508,7 @@ def run_vm_image_build(self, event: dict, package_config: dict, job_config: dict
return get_handlers_task_results(handler.run_job(), event)


@celery_app.task(name=TaskName.vm_image_build_result, base=HandlerTaskWithRetry)
@celery_app.task(name=TaskName.vm_image_build_result, base=TaskWithRetry)
def run_vm_image_build_result(
self, event: dict, package_config: dict, job_config: dict
):
Expand Down
6 changes: 2 additions & 4 deletions tests/integration/test_bodhi_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from packit_service.worker.monitoring import Pushgateway
from packit_service.worker.tasks import (
run_bodhi_update,
BodhiHandlerTaskWithRetry,
BodhiTaskWithRetry,
)
from tests.spellbook import first_dict_value, get_parameters_from_results

Expand Down Expand Up @@ -427,9 +427,7 @@ def test_bodhi_update_for_unknown_koji_build_failed_issue_comment(
event=event_dict,
# Needs to be the last try to inform user
celery_task=flexmock(
request=flexmock(
retries=BodhiHandlerTaskWithRetry.retry_kwargs["max_retries"]
),
request=flexmock(retries=BodhiTaskWithRetry.retry_kwargs["max_retries"]),
max_retries=DEFAULT_RETRY_LIMIT,
),
).run_job()
Expand Down