Skip to content

Commit

Permalink
Wait for schedules to be cleared before rescheduling
Browse files Browse the repository at this point in the history
Make the scheduling operation throw errors if a schedule already exists
  • Loading branch information
hellais committed Sep 9, 2024
1 parent 83fbfdd commit 713a0c1
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 88 deletions.
160 changes: 75 additions & 85 deletions oonipipeline/src/oonipipeline/temporal/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,93 +98,83 @@ async def schedule_all(
existing_schedules = await list_existing_schedules(
client=client, probe_cc=probe_cc, test_name=test_name
)
assert (
len(existing_schedules.observations) < 2
), f"duplicate schedule for observations: {existing_schedules.observations}"
assert (
len(existing_schedules.analysis) < 2
), f"duplicate schedule for analysis: {existing_schedules.analysis}"

if len(existing_schedules.observations) == 1:
schedule_id_map.observations = existing_schedules.observations[0]
if len(existing_schedules.analysis) == 1:
schedule_id_map.analysis = existing_schedules.analysis[0]

if schedule_id_map.observations is None:
obs_params = ObservationsWorkflowParams(
probe_cc=probe_cc,
test_name=test_name,
clickhouse=clickhouse_url,
data_dir=data_dir,
fast_fail=False,
)
sched_handle = await client.create_schedule(
id=f"{OBSERVATIONS_SCHED_PREFIX}-{filter_id}-{ts}",
schedule=Schedule(
action=ScheduleActionStartWorkflow(
ObservationsWorkflow.run,
obs_params,
id=f"{OBSERVATIONS_WF_PREFIX}-{filter_id}-{ts}",
task_queue=TASK_QUEUE_NAME,
execution_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT,
task_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT,
run_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT,
),
spec=ScheduleSpec(
intervals=[
ScheduleIntervalSpec(
every=timedelta(days=1), offset=timedelta(hours=2)
)
],
),
policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.ALLOW_ALL),
state=ScheduleState(
note="Run the observations workflow every day with an offset of 2 hours to ensure the files have been written to s3"
),

assert len(existing_schedules.observations) == 0
assert len(existing_schedules.analysis) == 0

obs_params = ObservationsWorkflowParams(
probe_cc=probe_cc,
test_name=test_name,
clickhouse=clickhouse_url,
data_dir=data_dir,
fast_fail=False,
)
sched_handle = await client.create_schedule(
id=f"{OBSERVATIONS_SCHED_PREFIX}-{filter_id}-{ts}",
schedule=Schedule(
action=ScheduleActionStartWorkflow(
ObservationsWorkflow.run,
obs_params,
id=f"{OBSERVATIONS_WF_PREFIX}-{filter_id}-{ts}",
task_queue=TASK_QUEUE_NAME,
execution_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT,
task_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT,
run_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT,
),
spec=ScheduleSpec(
intervals=[
ScheduleIntervalSpec(
every=timedelta(days=1), offset=timedelta(hours=2)
)
],
),
policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.ALLOW_ALL),
state=ScheduleState(
note="Run the observations workflow every day with an offset of 2 hours to ensure the files have been written to s3"
),
),
)
schedule_id_map.observations = sched_handle.id

analysis_params = AnalysisWorkflowParams(
probe_cc=probe_cc,
test_name=test_name,
clickhouse=clickhouse_url,
data_dir=data_dir,
fast_fail=False,
)
sched_handle = await client.create_schedule(
id=f"{ANALYSIS_SCHED_PREFIX}-{filter_id}-{ts}",
schedule=Schedule(
action=ScheduleActionStartWorkflow(
AnalysisWorkflow.run,
analysis_params,
id=f"{ANALYSIS_WF_PREFIX}-{filter_id}-{ts}",
task_queue=TASK_QUEUE_NAME,
execution_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
task_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
run_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
),
)
schedule_id_map.observations = sched_handle.id

if schedule_id_map.analysis is None:
analysis_params = AnalysisWorkflowParams(
probe_cc=probe_cc,
test_name=test_name,
clickhouse=clickhouse_url,
data_dir=data_dir,
fast_fail=False,
)
sched_handle = await client.create_schedule(
id=f"{ANALYSIS_SCHED_PREFIX}-{filter_id}-{ts}",
schedule=Schedule(
action=ScheduleActionStartWorkflow(
AnalysisWorkflow.run,
analysis_params,
id=f"{ANALYSIS_WF_PREFIX}-{filter_id}-{ts}",
task_queue=TASK_QUEUE_NAME,
execution_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
task_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
run_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT,
),
spec=ScheduleSpec(
intervals=[
ScheduleIntervalSpec(
# We offset the Analysis workflow by 4 hours assuming
# that the observation generation will take less than 4
# hours to complete.
# TODO(art): it's probably better to refactor this into some
# kind of DAG
every=timedelta(days=1),
offset=timedelta(hours=6),
)
],
),
policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.ALLOW_ALL),
state=ScheduleState(
note="Run the analysis workflow every day with an offset of 6 hours to ensure the observation workflow has completed"
),
spec=ScheduleSpec(
intervals=[
ScheduleIntervalSpec(
# We offset the Analysis workflow by 4 hours assuming
# that the observation generation will take less than 4
# hours to complete.
# TODO(art): it's probably better to refactor this into some
# kind of DAG
every=timedelta(days=1),
offset=timedelta(hours=6),
)
],
),
)
schedule_id_map.analysis = sched_handle.id
policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.ALLOW_ALL),
state=ScheduleState(
note="Run the analysis workflow every day with an offset of 6 hours to ensure the observation workflow has completed"
),
),
)
schedule_id_map.analysis = sched_handle.id

return schedule_id_map

Expand Down
17 changes: 14 additions & 3 deletions oonipipeline/tests/test_temporal_e2e.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from oonipipeline.temporal.schedules import schedule_all, clear_schedules
from oonipipeline.temporal.schedules import (
list_existing_schedules,
schedule_all,
clear_schedules,
)
import pytest

from temporalio.testing import WorkflowEnvironment
Expand Down Expand Up @@ -36,8 +40,15 @@ async def test_scheduling(datadir, db):
test_name=[],
)

# Wait 2 second for the ID to change
await asyncio.sleep(2)
while True:
await asyncio.sleep(1)
existing = await list_existing_schedules(
client=env.client,
probe_cc=[],
test_name=[],
)
if len(existing.observations) == 0 and len(existing.analysis) == 0:
break

sched_res2 = await schedule_all(
client=env.client,
Expand Down

0 comments on commit 713a0c1

Please sign in to comment.