From 7283f248ecd7af654459a393ccd2576637504750 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Fri, 6 Sep 2024 11:49:00 -0400 Subject: [PATCH] Refactoring of schedule and reschedule commands --- oonipipeline/src/oonipipeline/cli/commands.py | 122 +++----- .../temporal/client_operations.py | 121 ++++---- .../src/oonipipeline/temporal/schedules.py | 292 +++++++++++------- 3 files changed, 295 insertions(+), 240 deletions(-) diff --git a/oonipipeline/src/oonipipeline/cli/commands.py b/oonipipeline/src/oonipipeline/cli/commands.py index 77a36a72..23f971fb 100644 --- a/oonipipeline/src/oonipipeline/cli/commands.py +++ b/oonipipeline/src/oonipipeline/cli/commands.py @@ -9,18 +9,13 @@ run_backfill, run_create_schedules, run_status, + run_reschedule, ) from oonipipeline.temporal.workers import start_workers import click from click_loglevel import LogLevel -from ..temporal.workflows.observations import ( - ObservationsWorkflowParams, -) -from ..temporal.workflows.analysis import ( - AnalysisWorkflowParams, -) from ..__about__ import VERSION from ..db.connections import ClickhouseConnection @@ -120,7 +115,9 @@ def cli(log_level: int): @cli.command() @start_at_option @end_at_option -@click.option("--schedule-id", type=str, required=True) +@probe_cc_option +@test_name_option +@click.option("--workflow-name", type=str, required=True) @click.option( "--create-tables", is_flag=True, @@ -132,16 +129,18 @@ def cli(log_level: int): help="should we drop tables before creating them", ) def backfill( + probe_cc: List[str], + test_name: List[str], + workflow_name: str, start_at: datetime, end_at: datetime, create_tables: bool, drop_tables: bool, - schedule_id: str, ): """ Backfill for OONI measurements and write them into clickhouse """ - click.echo(f"Runnning backfill of schedule {schedule_id}") + click.echo(f"Runnning backfill of worfklow {workflow_name}") maybe_create_delete_tables( clickhouse_url=config.clickhouse_url, @@ -161,8 +160,10 @@ def backfill( ) run_backfill( - schedule_id=schedule_id, + workflow_name=workflow_name, temporal_config=temporal_config, + probe_cc=probe_cc, + test_name=test_name, start_at=start_at, end_at=end_at, ) @@ -176,66 +177,47 @@ def backfill( is_flag=True, help="should we fail immediately when we encounter an error?", ) -@click.option( - "--analysis/--no-analysis", - is_flag=True, - help="should we schedule an analysis", - default=False, -) -@click.option( - "--observations/--no-observations", - is_flag=True, - help="should we schedule observations", - default=True, -) -@click.option( - "--delete", - is_flag=True, - default=False, - help="if we should delete the schedule instead of creating it", -) -@click.option( - "--create-tables", - is_flag=True, - help="should we attempt to create the required clickhouse tables", -) -@click.option( - "--drop-tables", - is_flag=True, - help="should we drop tables before creating them", -) def schedule( probe_cc: List[str], test_name: List[str], fast_fail: bool, - create_tables: bool, - drop_tables: bool, - analysis: bool, - observations: bool, - delete: bool, ): """ Create schedules for the specified parameters """ - if not observations and not analysis: - click.echo("either observations or analysis should be set") - return 1 + temporal_config = TemporalConfig( + telemetry_endpoint=config.telemetry_endpoint, + prometheus_bind_address=config.prometheus_bind_address, + temporal_address=config.temporal_address, + temporal_namespace=config.temporal_namespace, + temporal_tls_client_cert_path=config.temporal_tls_client_cert_path, + temporal_tls_client_key_path=config.temporal_tls_client_key_path, + ) - maybe_create_delete_tables( + run_create_schedules( + probe_cc=probe_cc, + test_name=test_name, clickhouse_url=config.clickhouse_url, - create_tables=create_tables, - drop_tables=drop_tables, - clickhouse_buffer_min_time=config.clickhouse_buffer_min_time, - clickhouse_buffer_max_time=config.clickhouse_buffer_max_time, + data_dir=config.data_dir, + temporal_config=temporal_config, ) - what_we_schedule = [] - if analysis: - what_we_schedule.append("analysis") - if observations: - what_we_schedule.append("observations") - click.echo(f"Scheduling {' and'.join(what_we_schedule)}") +@cli.command() +@probe_cc_option +@test_name_option +@click.option( + "--fast-fail", + is_flag=True, + help="should we fail immediately when we encounter an error?", +) +def reschedule( + probe_cc: List[str], + test_name: List[str], +): + """ + Create schedules for the specified parameters + """ temporal_config = TemporalConfig( telemetry_endpoint=config.telemetry_endpoint, prometheus_bind_address=config.prometheus_bind_address, @@ -244,29 +226,13 @@ def schedule( temporal_tls_client_cert_path=config.temporal_tls_client_cert_path, temporal_tls_client_key_path=config.temporal_tls_client_key_path, ) - obs_params = None - if observations: - obs_params = ObservationsWorkflowParams( - probe_cc=probe_cc, - test_name=test_name, - clickhouse=config.clickhouse_url, - data_dir=config.data_dir, - fast_fail=fast_fail, - ) - analysis_params = None - if analysis: - analysis_params = AnalysisWorkflowParams( - probe_cc=probe_cc, - test_name=test_name, - clickhouse=config.clickhouse_url, - data_dir=config.data_dir, - ) - run_create_schedules( - obs_params=obs_params, - analysis_params=analysis_params, + run_reschedule( + probe_cc=probe_cc, + test_name=test_name, + clickhouse_url=config.clickhouse_url, + data_dir=config.data_dir, temporal_config=temporal_config, - delete=delete, ) diff --git a/oonipipeline/src/oonipipeline/temporal/client_operations.py b/oonipipeline/src/oonipipeline/temporal/client_operations.py index 962a8cae..ab3177b8 100644 --- a/oonipipeline/src/oonipipeline/temporal/client_operations.py +++ b/oonipipeline/src/oonipipeline/temporal/client_operations.py @@ -1,15 +1,13 @@ import asyncio import logging from dataclasses import dataclass -from datetime import datetime, timedelta +from datetime import datetime from typing import List, Optional, Tuple -from oonipipeline.temporal.workflows.analysis import AnalysisWorkflowParams -from oonipipeline.temporal.workflows.observations import ObservationsWorkflowParams - from oonipipeline.temporal.schedules import ( - schedule_analysis, - schedule_observations, + ScheduleIdMap, + schedule_all, + schedule_backfill, ) from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter @@ -20,8 +18,6 @@ from temporalio.client import ( Client as TemporalClient, - ScheduleBackfill, - ScheduleOverlapPolicy, WorkflowExecution, ) from temporalio.service import TLSConfig @@ -116,63 +112,45 @@ async def temporal_connect( async def execute_backfill( - schedule_id: str, - temporal_config: TemporalConfig, + probe_cc: List[str], + test_name: List[str], start_at: datetime, end_at: datetime, + workflow_name: str, + temporal_config: TemporalConfig, ): - log.info(f"running backfill for schedule_id={schedule_id}") + log.info(f"creating all schedules") client = await temporal_connect(temporal_config=temporal_config) - found_schedule_id = None - schedule_list = await client.list_schedules() - async for sched in schedule_list: - if sched.id.startswith(schedule_id): - found_schedule_id = sched.id - break - if not found_schedule_id: - log.error(f"schedule ID not found for prefix {schedule_id}") - return - - handle = client.get_schedule_handle(found_schedule_id) - await handle.backfill( - ScheduleBackfill( - start_at=start_at + timedelta(hours=1), - end_at=end_at + timedelta(hours=1), - overlap=ScheduleOverlapPolicy.ALLOW_ALL, - ), + return await schedule_backfill( + client=client, + probe_cc=probe_cc, + test_name=test_name, + start_at=start_at, + end_at=end_at, + workflow_name=workflow_name, ) async def create_schedules( - obs_params: Optional[ObservationsWorkflowParams], - analysis_params: Optional[AnalysisWorkflowParams], + probe_cc: List[str], + test_name: List[str], + clickhouse_url: str, + data_dir: str, temporal_config: TemporalConfig, - delete: bool = False, -) -> dict: +) -> ScheduleIdMap: log.info(f"creating all schedules") client = await temporal_connect(temporal_config=temporal_config) - obs_schedule_id = None - if obs_params is not None: - obs_schedule_id = await schedule_observations( - client=client, params=obs_params, delete=delete - ) - log.info(f"created schedule observations schedule with ID={obs_schedule_id}") - - analysis_schedule_id = None - if analysis_params is not None: - analysis_schedule_id = await schedule_analysis( - client=client, params=analysis_params, delete=delete - ) - log.info(f"created schedule analysis schedule with ID={analysis_schedule_id}") - - return { - "analysis_schedule_id": analysis_schedule_id, - "observations_schedule_id": obs_schedule_id, - } + return await schedule_all( + client=client, + probe_cc=probe_cc, + test_name=test_name, + clickhouse_url=clickhouse_url, + data_dir=data_dir, + ) async def get_status( @@ -214,7 +192,9 @@ async def get_status( def run_backfill( temporal_config: TemporalConfig, - schedule_id: str, + probe_cc: List[str], + test_name: List[str], + workflow_name: str, start_at: datetime, end_at: datetime, ): @@ -222,7 +202,9 @@ def run_backfill( asyncio.run( execute_backfill( temporal_config=temporal_config, - schedule_id=schedule_id, + workflow_name=workflow_name, + probe_cc=probe_cc, + test_name=test_name, start_at=start_at, end_at=end_at, ) @@ -232,18 +214,41 @@ def run_backfill( def run_create_schedules( - obs_params: Optional[ObservationsWorkflowParams], - analysis_params: Optional[AnalysisWorkflowParams], + probe_cc: List[str], + test_name: List[str], + clickhouse_url: str, + data_dir: str, + temporal_config: TemporalConfig, +): + try: + asyncio.run( + create_schedules( + probe_cc=probe_cc, + test_name=test_name, + clickhouse_url=clickhouse_url, + data_dir=data_dir, + temporal_config=temporal_config, + ) + ) + except KeyboardInterrupt: + print("shutting down") + + +def run_reschedule( + probe_cc: List[str], + test_name: List[str], + clickhouse_url: str, + data_dir: str, temporal_config: TemporalConfig, - delete: bool, ): try: asyncio.run( create_schedules( - obs_params=obs_params, - analysis_params=analysis_params, + probe_cc=probe_cc, + test_name=test_name, + clickhouse_url=clickhouse_url, + data_dir=data_dir, temporal_config=temporal_config, - delete=delete, ) ) except KeyboardInterrupt: diff --git a/oonipipeline/src/oonipipeline/temporal/schedules.py b/oonipipeline/src/oonipipeline/temporal/schedules.py index 5c729a7d..b217eee1 100644 --- a/oonipipeline/src/oonipipeline/temporal/schedules.py +++ b/oonipipeline/src/oonipipeline/temporal/schedules.py @@ -1,4 +1,5 @@ -from typing import List +from dataclasses import dataclass +from typing import List, Optional, TypedDict import logging from datetime import datetime, timedelta, timezone @@ -17,6 +18,7 @@ from temporalio.client import ( Client as TemporalClient, Schedule, + ScheduleBackfill, ScheduleActionStartWorkflow, ScheduleIntervalSpec, ScheduleSpec, @@ -27,8 +29,13 @@ log = logging.getLogger("oonipipeline.workflows") +OBSERVATIONS_SCHED_PREFIX = "oopln-sched-observations" +OBSERVATIONS_WF_PREFIX = "oopln-wf-observations" +ANALYSIS_WF_PREFIX = "oopln-wf-analysis" +ANALYSIS_SCHED_PREFIX = "oopln-sched-analysis" -def gen_schedule_id(probe_cc: List[str], test_name: List[str], name: str): + +def gen_schedule_filter_id(probe_cc: List[str], test_name: List[str]): probe_cc_key = "ALLCCS" if len(probe_cc) > 0: probe_cc_key = ".".join(map(lambda x: x.lower(), sorted(probe_cc))) @@ -36,125 +43,202 @@ def gen_schedule_id(probe_cc: List[str], test_name: List[str], name: str): if len(test_name) > 0: test_name_key = ".".join(map(lambda x: x.lower(), sorted(test_name))) - return f"oonipipeline-{name}-schedule-{probe_cc_key}-{test_name_key}" + return f"{probe_cc_key}-{test_name_key}" -async def schedule_observations( - client: TemporalClient, params: ObservationsWorkflowParams, delete: bool -) -> List[str]: - base_schedule_id = gen_schedule_id( - params.probe_cc, params.test_name, "observations" - ) +@dataclass +class ScheduleIdMap: + observations: Optional[str] = None + analysis: Optional[str] = None - existing_schedules = [] - schedule_list = await client.list_schedules() - async for sched in schedule_list: - if sched.id.startswith(base_schedule_id): - existing_schedules.append(sched.id) - if delete is True: - for sched_id in existing_schedules: - schedule_handle = client.get_schedule_handle(sched_id) - await schedule_handle.delete() - return existing_schedules +@dataclass +class ScheduleIdMapList: + observations: List[str] + analysis: List[str] - if len(existing_schedules) == 1: - return existing_schedules - elif len(existing_schedules) > 0: - print("WARNING: multiple schedules detected") - return existing_schedules - ts = datetime.now(timezone.utc).strftime("%y.%m.%d_%H%M%S") - schedule_id = f"{base_schedule_id}-{ts}" - - await client.create_schedule( - id=schedule_id, - schedule=Schedule( - action=ScheduleActionStartWorkflow( - ObservationsWorkflow.run, - params, - id=schedule_id.replace("-schedule-", "-workflow-"), - task_queue=TASK_QUEUE_NAME, - execution_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT, - task_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT, - run_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT, - ), - spec=ScheduleSpec( - intervals=[ - ScheduleIntervalSpec( - every=timedelta(days=1), offset=timedelta(hours=2) - ) - ], - ), - policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.BUFFER_ALL), - state=ScheduleState( - note="Run the observations workflow every day with an offset of 2 hours to ensure the files have been written to s3" - ), - ), +async def list_existing_schedules( + client: TemporalClient, + probe_cc: List[str], + test_name: List[str], +): + schedule_id_map_list = ScheduleIdMapList( + observations=[], + analysis=[], ) - return [schedule_id] - + filter_id = gen_schedule_filter_id(probe_cc, test_name) -async def schedule_analysis( - client: TemporalClient, params: AnalysisWorkflowParams, delete: bool -) -> List[str]: - base_schedule_id = gen_schedule_id(params.probe_cc, params.test_name, "analysis") - - existing_schedules = [] schedule_list = await client.list_schedules() async for sched in schedule_list: - if sched.id.startswith(base_schedule_id): - existing_schedules.append(sched.id) - - if delete is True: - for sched_id in existing_schedules: - schedule_handle = client.get_schedule_handle(sched_id) - await schedule_handle.delete() - return existing_schedules - - if len(existing_schedules) == 1: - return existing_schedules - elif len(existing_schedules) > 0: - print("WARNING: multiple schedules detected") - return existing_schedules - + if sched.id.startswith(f"{OBSERVATIONS_SCHED_PREFIX}-{filter_id}"): + schedule_id_map_list.observations.append(sched.id) + elif sched.id.startswith(f"{ANALYSIS_WF_PREFIX}-{filter_id}"): + schedule_id_map_list.analysis.append(sched.id) + return schedule_id_map_list + + +async def schedule_all( + client: TemporalClient, + probe_cc: List[str], + test_name: List[str], + clickhouse_url: str, + data_dir: str, +) -> ScheduleIdMap: + schedule_id_map = ScheduleIdMap() + filter_id = gen_schedule_filter_id(probe_cc, test_name) # We need to append a timestamp to the schedule so that we are able to rerun # the backfill operations by deleting the existing schedule and # re-scheduling it. Not doing so will mean that temporal will believe the # workflow has already been execututed and will refuse to re-run it. # TODO(art): check if there is a more idiomatic way of implementing this ts = datetime.now(timezone.utc).strftime("%y.%m.%d_%H%M%S") - schedule_id = f"{base_schedule_id}-{ts}" - - await client.create_schedule( - id=schedule_id, - schedule=Schedule( - action=ScheduleActionStartWorkflow( - AnalysisWorkflow.run, - params, - id=schedule_id.replace("-schedule-", "-workflow-"), - task_queue=TASK_QUEUE_NAME, - execution_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, - task_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, - run_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, - ), - spec=ScheduleSpec( - intervals=[ - ScheduleIntervalSpec( - # We offset the Analysis workflow by 4 hours assuming - # that the observation generation will take less than 4 - # hours to complete. - # TODO(art): it's probably better to refactor this into some - # kind of DAG - every=timedelta(days=1), - offset=timedelta(hours=6), - ) - ], + + existing_schedules = await list_existing_schedules( + client=client, probe_cc=probe_cc, test_name=test_name + ) + assert ( + len(existing_schedules.observations) < 2 + ), f"duplicate schedule for observations: {existing_schedules.observations}" + assert ( + len(existing_schedules.analysis) < 2 + ), f"duplicate schedule for analysis: {existing_schedules.analysis}" + + if len(existing_schedules.observations) == 1: + schedule_id_map.observations = existing_schedules.observations[0] + if len(existing_schedules.analysis) == 1: + schedule_id_map.analysis = existing_schedules.analysis[0] + + if schedule_id_map.observations is None: + obs_params = ObservationsWorkflowParams( + probe_cc=probe_cc, + test_name=test_name, + clickhouse=clickhouse_url, + data_dir=data_dir, + fast_fail=False, + ) + sched_handle = await client.create_schedule( + id=f"{OBSERVATIONS_SCHED_PREFIX}-{filter_id}-{ts}", + schedule=Schedule( + action=ScheduleActionStartWorkflow( + ObservationsWorkflow.run, + obs_params, + id=f"{OBSERVATIONS_WF_PREFIX}-{filter_id}-{ts}", + task_queue=TASK_QUEUE_NAME, + execution_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT, + task_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT, + run_timeout=MAKE_OBSERVATIONS_START_TO_CLOSE_TIMEOUT, + ), + spec=ScheduleSpec( + intervals=[ + ScheduleIntervalSpec( + every=timedelta(days=1), offset=timedelta(hours=2) + ) + ], + ), + policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.BUFFER_ALL), + state=ScheduleState( + note="Run the observations workflow every day with an offset of 2 hours to ensure the files have been written to s3" + ), ), - policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.BUFFER_ALL), - state=ScheduleState( - note="Run the analysis workflow every day with an offset of 6 hours to ensure the observation workflow has completed" + ) + schedule_id_map.observations = sched_handle.id + + if schedule_id_map.analysis is None: + analysis_params = AnalysisWorkflowParams( + probe_cc=probe_cc, + test_name=test_name, + clickhouse=clickhouse_url, + data_dir=data_dir, + fast_fail=False, + ) + sched_handle = await client.create_schedule( + id=f"{ANALYSIS_SCHED_PREFIX}-{filter_id}-{ts}", + schedule=Schedule( + action=ScheduleActionStartWorkflow( + AnalysisWorkflow.run, + analysis_params, + id=f"{ANALYSIS_WF_PREFIX}-{filter_id}-{ts}", + task_queue=TASK_QUEUE_NAME, + execution_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, + task_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, + run_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, + ), + spec=ScheduleSpec( + intervals=[ + ScheduleIntervalSpec( + # We offset the Analysis workflow by 4 hours assuming + # that the observation generation will take less than 4 + # hours to complete. + # TODO(art): it's probably better to refactor this into some + # kind of DAG + every=timedelta(days=1), + offset=timedelta(hours=6), + ) + ], + ), + policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.BUFFER_ALL), + state=ScheduleState( + note="Run the analysis workflow every day with an offset of 6 hours to ensure the observation workflow has completed" + ), ), + ) + schedule_id_map.analysis = sched_handle.id + + return schedule_id_map + + +async def reschedule_all( + client: TemporalClient, + probe_cc: List[str], + test_name: List[str], + clickhouse_url: str, + data_dir: str, +) -> ScheduleIdMap: + existing_schedules = await list_existing_schedules( + client=client, probe_cc=probe_cc, test_name=test_name + ) + for schedule_id in existing_schedules.observations + existing_schedules.analysis: + await client.get_schedule_handle(schedule_id).delete() + + return await schedule_all( + client=client, + probe_cc=probe_cc, + test_name=test_name, + clickhouse_url=clickhouse_url, + data_dir=data_dir, + ) + + +async def schedule_backfill( + client: TemporalClient, + workflow_name: str, + start_at: datetime, + end_at: datetime, + probe_cc: List[str], + test_name: List[str], +): + existing_schedules = await list_existing_schedules( + client=client, probe_cc=probe_cc, test_name=test_name + ) + if workflow_name == "observations": + assert ( + len(existing_schedules.observations) == 1 + ), "Expected one schedule for observations" + schedule_id = existing_schedules.observations[0] + elif workflow_name == "analysis": + assert ( + len(existing_schedules.analysis) == 1 + ), "Expected one schedule for analysis" + schedule_id = existing_schedules.analysis[0] + else: + raise ValueError(f"Unknown workflow name: {workflow_name}") + + handle = client.get_schedule_handle(schedule_id) + await handle.backfill( + ScheduleBackfill( + start_at=start_at + timedelta(hours=1), + end_at=end_at + timedelta(hours=1), + overlap=ScheduleOverlapPolicy.BUFFER_ALL, ), ) - return [schedule_id]