From 38f61e1049191181c2328162e30a666e96782d56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Fri, 6 Sep 2024 14:15:29 -0400 Subject: [PATCH] Fix bug in reschedule command * Add tests for scheduling --- .../src/oonipipeline/temporal/schedules.py | 3 +- oonipipeline/tests/test_temporal_e2e.py | 33 +++++++++++++++++-- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/oonipipeline/src/oonipipeline/temporal/schedules.py b/oonipipeline/src/oonipipeline/temporal/schedules.py index e2a1de7d..732d657f 100644 --- a/oonipipeline/src/oonipipeline/temporal/schedules.py +++ b/oonipipeline/src/oonipipeline/temporal/schedules.py @@ -73,8 +73,9 @@ async def list_existing_schedules( async for sched in schedule_list: if sched.id.startswith(f"{OBSERVATIONS_SCHED_PREFIX}-{filter_id}"): schedule_id_map_list.observations.append(sched.id) - elif sched.id.startswith(f"{ANALYSIS_WF_PREFIX}-{filter_id}"): + elif sched.id.startswith(f"{ANALYSIS_SCHED_PREFIX}-{filter_id}"): schedule_id_map_list.analysis.append(sched.id) + return schedule_id_map_list diff --git a/oonipipeline/tests/test_temporal_e2e.py b/oonipipeline/tests/test_temporal_e2e.py index 4b138147..f09cf1f2 100644 --- a/oonipipeline/tests/test_temporal_e2e.py +++ b/oonipipeline/tests/test_temporal_e2e.py @@ -1,9 +1,10 @@ +import asyncio from concurrent.futures import ThreadPoolExecutor +from oonipipeline.temporal.schedules import schedule_all, reschedule_all import pytest from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker -from temporalio import activity from oonipipeline.temporal.workflows.common import TASK_QUEUE_NAME @@ -15,6 +16,34 @@ from .utils import wait_for_mutations + +@pytest.mark.asyncio +async def test_scheduling(datadir, db): + async with await WorkflowEnvironment.start_local() as env: + sched_res = await schedule_all( + client=env.client, + probe_cc=[], + test_name=[], + clickhouse_url=db.clickhouse_url, + data_dir=str(datadir), + ) + assert sched_res.analysis + assert sched_res.observations + + # Wait 1 second for the ID to change + await asyncio.sleep(1) + + reschedule_res = await reschedule_all( + client=env.client, + probe_cc=[], + test_name=[], + clickhouse_url=db.clickhouse_url, + data_dir=str(datadir), + ) + assert reschedule_res.observations != sched_res.observations + assert reschedule_res.analysis != sched_res.analysis + + @pytest.mark.asyncio async def test_observation_workflow(datadir, db): obs_params = ObservationsWorkflowParams( @@ -25,7 +54,7 @@ async def test_observation_workflow(datadir, db): fast_fail=False, bucket_date="2022-10-21", ) - async with await WorkflowEnvironment.start_time_skipping() as env: + async with await WorkflowEnvironment.start_local() as env: async with Worker( env.client, task_queue=TASK_QUEUE_NAME,