From 713a0c1675c7a809daf4ad102dfdf5e4cb8ca834 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Mon, 9 Sep 2024 09:45:14 -0400 Subject: [PATCH] Wait for schedules to be cleared before rescheduling Make the scheduling operation throw errors if a schedule already exists --- .../src/oonipipeline/temporal/schedules.py | 160 ++++++++---------- oonipipeline/tests/test_temporal_e2e.py | 17 +- 2 files changed, 89 insertions(+), 88 deletions(-) diff --git a/oonipipeline/src/oonipipeline/temporal/schedules.py b/oonipipeline/src/oonipipeline/temporal/schedules.py index 421efa05..abee47b2 100644 --- a/oonipipeline/src/oonipipeline/temporal/schedules.py +++ b/oonipipeline/src/oonipipeline/temporal/schedules.py @@ -98,93 +98,83 @@ async def schedule_all( existing_schedules = await list_existing_schedules( client=client, probe_cc=probe_cc, test_name=test_name ) - assert ( - len(existing_schedules.observations) < 2 - ), f"duplicate schedule for observations: {existing_schedules.observations}" - assert ( - len(existing_schedules.analysis) < 2 - ), f"duplicate schedule for analysis: {existing_schedules.analysis}" - - if len(existing_schedules.observations) == 1: - schedule_id_map.observations = existing_schedules.observations[0] - if len(existing_schedules.analysis) == 1: - schedule_id_map.analysis = existing_schedules.analysis[0] - - if schedule_id_map.observations is None: - obs_params = ObservationsWorkflowParams( - probe_cc=probe_cc, - test_name=test_name, - clickhouse=clickhouse_url, - data_dir=data_dir, - fast_fail=False, - ) - sched_handle = await client.create_schedule( - id=f"{OBSERVATIONS_SCHED_PREFIX}-{filter_id}-{ts}", - schedule=Schedule( - action=ScheduleActionStartWorkflow( - ObservationsWorkflow.run, - obs_params, - id=f"{OBSERVATIONS_WF_PREFIX}-{filter_id}-{ts}", - task_queue=TASK_QUEUE_NAME, - execution_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT, - task_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT, - run_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT, - ), - spec=ScheduleSpec( - intervals=[ - ScheduleIntervalSpec( - every=timedelta(days=1), offset=timedelta(hours=2) - ) - ], - ), - policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.ALLOW_ALL), - state=ScheduleState( - note="Run the observations workflow every day with an offset of 2 hours to ensure the files have been written to s3" - ), + + assert len(existing_schedules.observations) == 0 + assert len(existing_schedules.analysis) == 0 + + obs_params = ObservationsWorkflowParams( + probe_cc=probe_cc, + test_name=test_name, + clickhouse=clickhouse_url, + data_dir=data_dir, + fast_fail=False, + ) + sched_handle = await client.create_schedule( + id=f"{OBSERVATIONS_SCHED_PREFIX}-{filter_id}-{ts}", + schedule=Schedule( + action=ScheduleActionStartWorkflow( + ObservationsWorkflow.run, + obs_params, + id=f"{OBSERVATIONS_WF_PREFIX}-{filter_id}-{ts}", + task_queue=TASK_QUEUE_NAME, + execution_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT, + task_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT, + run_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT, + ), + spec=ScheduleSpec( + intervals=[ + ScheduleIntervalSpec( + every=timedelta(days=1), offset=timedelta(hours=2) + ) + ], + ), + policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.ALLOW_ALL), + state=ScheduleState( + note="Run the observations workflow every day with an offset of 2 hours to ensure the files have been written to s3" + ), + ), + ) + schedule_id_map.observations = sched_handle.id + + analysis_params = AnalysisWorkflowParams( + probe_cc=probe_cc, + test_name=test_name, + clickhouse=clickhouse_url, + data_dir=data_dir, + fast_fail=False, + ) + sched_handle = await client.create_schedule( + id=f"{ANALYSIS_SCHED_PREFIX}-{filter_id}-{ts}", + schedule=Schedule( + action=ScheduleActionStartWorkflow( + AnalysisWorkflow.run, + analysis_params, + id=f"{ANALYSIS_WF_PREFIX}-{filter_id}-{ts}", + task_queue=TASK_QUEUE_NAME, + execution_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, + task_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, + run_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, ), - ) - schedule_id_map.observations = sched_handle.id - - if schedule_id_map.analysis is None: - analysis_params = AnalysisWorkflowParams( - probe_cc=probe_cc, - test_name=test_name, - clickhouse=clickhouse_url, - data_dir=data_dir, - fast_fail=False, - ) - sched_handle = await client.create_schedule( - id=f"{ANALYSIS_SCHED_PREFIX}-{filter_id}-{ts}", - schedule=Schedule( - action=ScheduleActionStartWorkflow( - AnalysisWorkflow.run, - analysis_params, - id=f"{ANALYSIS_WF_PREFIX}-{filter_id}-{ts}", - task_queue=TASK_QUEUE_NAME, - execution_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, - task_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, - run_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, - ), - spec=ScheduleSpec( - intervals=[ - ScheduleIntervalSpec( - # We offset the Analysis workflow by 4 hours assuming - # that the observation generation will take less than 4 - # hours to complete. - # TODO(art): it's probably better to refactor this into some - # kind of DAG - every=timedelta(days=1), - offset=timedelta(hours=6), - ) - ], - ), - policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.ALLOW_ALL), - state=ScheduleState( - note="Run the analysis workflow every day with an offset of 6 hours to ensure the observation workflow has completed" - ), + spec=ScheduleSpec( + intervals=[ + ScheduleIntervalSpec( + # We offset the Analysis workflow by 4 hours assuming + # that the observation generation will take less than 4 + # hours to complete. + # TODO(art): it's probably better to refactor this into some + # kind of DAG + every=timedelta(days=1), + offset=timedelta(hours=6), + ) + ], ), - ) - schedule_id_map.analysis = sched_handle.id + policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.ALLOW_ALL), + state=ScheduleState( + note="Run the analysis workflow every day with an offset of 6 hours to ensure the observation workflow has completed" + ), + ), + ) + schedule_id_map.analysis = sched_handle.id return schedule_id_map diff --git a/oonipipeline/tests/test_temporal_e2e.py b/oonipipeline/tests/test_temporal_e2e.py index 80b3228f..8faff2be 100644 --- a/oonipipeline/tests/test_temporal_e2e.py +++ b/oonipipeline/tests/test_temporal_e2e.py @@ -1,6 +1,10 @@ import asyncio from concurrent.futures import ThreadPoolExecutor -from oonipipeline.temporal.schedules import schedule_all, clear_schedules +from oonipipeline.temporal.schedules import ( + list_existing_schedules, + schedule_all, + clear_schedules, +) import pytest from temporalio.testing import WorkflowEnvironment @@ -36,8 +40,15 @@ async def test_scheduling(datadir, db): test_name=[], ) - # Wait 2 second for the ID to change - await asyncio.sleep(2) + while True: + await asyncio.sleep(1) + existing = await list_existing_schedules( + client=env.client, + probe_cc=[], + test_name=[], + ) + if len(existing.observations) == 0 and len(existing.analysis) == 0: + break sched_res2 = await schedule_all( client=env.client,