Skip to content

Commit

Permalink
Refactoring of schedule and reschedule commands
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Sep 6, 2024
1 parent a487653 commit 7283f24
Show file tree
Hide file tree
Showing 3 changed files with 295 additions and 240 deletions.
122 changes: 44 additions & 78 deletions oonipipeline/src/oonipipeline/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@
run_backfill,
run_create_schedules,
run_status,
run_reschedule,
)
from oonipipeline.temporal.workers import start_workers

import click
from click_loglevel import LogLevel

from ..temporal.workflows.observations import (
ObservationsWorkflowParams,
)
from ..temporal.workflows.analysis import (
AnalysisWorkflowParams,
)

from ..__about__ import VERSION
from ..db.connections import ClickhouseConnection
Expand Down Expand Up @@ -120,7 +115,9 @@ def cli(log_level: int):
@cli.command()
@start_at_option
@end_at_option
@click.option("--schedule-id", type=str, required=True)
@probe_cc_option
@test_name_option
@click.option("--workflow-name", type=str, required=True)
@click.option(
"--create-tables",
is_flag=True,
Expand All @@ -132,16 +129,18 @@ def cli(log_level: int):
help="should we drop tables before creating them",
)
def backfill(
probe_cc: List[str],
test_name: List[str],
workflow_name: str,
start_at: datetime,
end_at: datetime,
create_tables: bool,
drop_tables: bool,
schedule_id: str,
):
"""
Backfill for OONI measurements and write them into clickhouse
"""
click.echo(f"Runnning backfill of schedule {schedule_id}")
click.echo(f"Runnning backfill of worfklow {workflow_name}")

maybe_create_delete_tables(
clickhouse_url=config.clickhouse_url,
Expand All @@ -161,8 +160,10 @@ def backfill(
)

run_backfill(
schedule_id=schedule_id,
workflow_name=workflow_name,
temporal_config=temporal_config,
probe_cc=probe_cc,
test_name=test_name,
start_at=start_at,
end_at=end_at,
)
Expand All @@ -176,66 +177,47 @@ def backfill(
is_flag=True,
help="should we fail immediately when we encounter an error?",
)
@click.option(
"--analysis/--no-analysis",
is_flag=True,
help="should we schedule an analysis",
default=False,
)
@click.option(
"--observations/--no-observations",
is_flag=True,
help="should we schedule observations",
default=True,
)
@click.option(
"--delete",
is_flag=True,
default=False,
help="if we should delete the schedule instead of creating it",
)
@click.option(
"--create-tables",
is_flag=True,
help="should we attempt to create the required clickhouse tables",
)
@click.option(
"--drop-tables",
is_flag=True,
help="should we drop tables before creating them",
)
def schedule(
probe_cc: List[str],
test_name: List[str],
fast_fail: bool,
create_tables: bool,
drop_tables: bool,
analysis: bool,
observations: bool,
delete: bool,
):
"""
Create schedules for the specified parameters
"""
if not observations and not analysis:
click.echo("either observations or analysis should be set")
return 1
temporal_config = TemporalConfig(
telemetry_endpoint=config.telemetry_endpoint,
prometheus_bind_address=config.prometheus_bind_address,
temporal_address=config.temporal_address,
temporal_namespace=config.temporal_namespace,
temporal_tls_client_cert_path=config.temporal_tls_client_cert_path,
temporal_tls_client_key_path=config.temporal_tls_client_key_path,
)

maybe_create_delete_tables(
run_create_schedules(
probe_cc=probe_cc,
test_name=test_name,
clickhouse_url=config.clickhouse_url,
create_tables=create_tables,
drop_tables=drop_tables,
clickhouse_buffer_min_time=config.clickhouse_buffer_min_time,
clickhouse_buffer_max_time=config.clickhouse_buffer_max_time,
data_dir=config.data_dir,
temporal_config=temporal_config,
)
what_we_schedule = []
if analysis:
what_we_schedule.append("analysis")
if observations:
what_we_schedule.append("observations")

click.echo(f"Scheduling {' and'.join(what_we_schedule)}")

@cli.command()
@probe_cc_option
@test_name_option
@click.option(
"--fast-fail",
is_flag=True,
help="should we fail immediately when we encounter an error?",
)
def reschedule(
probe_cc: List[str],
test_name: List[str],
):
"""
Create schedules for the specified parameters
"""
temporal_config = TemporalConfig(
telemetry_endpoint=config.telemetry_endpoint,
prometheus_bind_address=config.prometheus_bind_address,
Expand All @@ -244,29 +226,13 @@ def schedule(
temporal_tls_client_cert_path=config.temporal_tls_client_cert_path,
temporal_tls_client_key_path=config.temporal_tls_client_key_path,
)
obs_params = None
if observations:
obs_params = ObservationsWorkflowParams(
probe_cc=probe_cc,
test_name=test_name,
clickhouse=config.clickhouse_url,
data_dir=config.data_dir,
fast_fail=fast_fail,
)
analysis_params = None
if analysis:
analysis_params = AnalysisWorkflowParams(
probe_cc=probe_cc,
test_name=test_name,
clickhouse=config.clickhouse_url,
data_dir=config.data_dir,
)

run_create_schedules(
obs_params=obs_params,
analysis_params=analysis_params,
run_reschedule(
probe_cc=probe_cc,
test_name=test_name,
clickhouse_url=config.clickhouse_url,
data_dir=config.data_dir,
temporal_config=temporal_config,
delete=delete,
)


Expand Down
121 changes: 63 additions & 58 deletions oonipipeline/src/oonipipeline/temporal/client_operations.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from datetime import datetime
from typing import List, Optional, Tuple

from oonipipeline.temporal.workflows.analysis import AnalysisWorkflowParams
from oonipipeline.temporal.workflows.observations import ObservationsWorkflowParams

from oonipipeline.temporal.schedules import (
schedule_analysis,
schedule_observations,
ScheduleIdMap,
schedule_all,
schedule_backfill,
)

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
Expand All @@ -20,8 +18,6 @@

from temporalio.client import (
Client as TemporalClient,
ScheduleBackfill,
ScheduleOverlapPolicy,
WorkflowExecution,
)
from temporalio.service import TLSConfig
Expand Down Expand Up @@ -116,63 +112,45 @@ async def temporal_connect(


async def execute_backfill(
schedule_id: str,
temporal_config: TemporalConfig,
probe_cc: List[str],
test_name: List[str],
start_at: datetime,
end_at: datetime,
workflow_name: str,
temporal_config: TemporalConfig,
):
log.info(f"running backfill for schedule_id={schedule_id}")
log.info(f"creating all schedules")

client = await temporal_connect(temporal_config=temporal_config)

found_schedule_id = None
schedule_list = await client.list_schedules()
async for sched in schedule_list:
if sched.id.startswith(schedule_id):
found_schedule_id = sched.id
break
if not found_schedule_id:
log.error(f"schedule ID not found for prefix {schedule_id}")
return

handle = client.get_schedule_handle(found_schedule_id)
await handle.backfill(
ScheduleBackfill(
start_at=start_at + timedelta(hours=1),
end_at=end_at + timedelta(hours=1),
overlap=ScheduleOverlapPolicy.ALLOW_ALL,
),
return await schedule_backfill(
client=client,
probe_cc=probe_cc,
test_name=test_name,
start_at=start_at,
end_at=end_at,
workflow_name=workflow_name,
)


async def create_schedules(
obs_params: Optional[ObservationsWorkflowParams],
analysis_params: Optional[AnalysisWorkflowParams],
probe_cc: List[str],
test_name: List[str],
clickhouse_url: str,
data_dir: str,
temporal_config: TemporalConfig,
delete: bool = False,
) -> dict:
) -> ScheduleIdMap:
log.info(f"creating all schedules")

client = await temporal_connect(temporal_config=temporal_config)

obs_schedule_id = None
if obs_params is not None:
obs_schedule_id = await schedule_observations(
client=client, params=obs_params, delete=delete
)
log.info(f"created schedule observations schedule with ID={obs_schedule_id}")

analysis_schedule_id = None
if analysis_params is not None:
analysis_schedule_id = await schedule_analysis(
client=client, params=analysis_params, delete=delete
)
log.info(f"created schedule analysis schedule with ID={analysis_schedule_id}")

return {
"analysis_schedule_id": analysis_schedule_id,
"observations_schedule_id": obs_schedule_id,
}
return await schedule_all(
client=client,
probe_cc=probe_cc,
test_name=test_name,
clickhouse_url=clickhouse_url,
data_dir=data_dir,
)


async def get_status(
Expand Down Expand Up @@ -214,15 +192,19 @@ async def get_status(

def run_backfill(
temporal_config: TemporalConfig,
schedule_id: str,
probe_cc: List[str],
test_name: List[str],
workflow_name: str,
start_at: datetime,
end_at: datetime,
):
try:
asyncio.run(
execute_backfill(
temporal_config=temporal_config,
schedule_id=schedule_id,
workflow_name=workflow_name,
probe_cc=probe_cc,
test_name=test_name,
start_at=start_at,
end_at=end_at,
)
Expand All @@ -232,18 +214,41 @@ def run_backfill(


def run_create_schedules(
obs_params: Optional[ObservationsWorkflowParams],
analysis_params: Optional[AnalysisWorkflowParams],
probe_cc: List[str],
test_name: List[str],
clickhouse_url: str,
data_dir: str,
temporal_config: TemporalConfig,
):
try:
asyncio.run(
create_schedules(
probe_cc=probe_cc,
test_name=test_name,
clickhouse_url=clickhouse_url,
data_dir=data_dir,
temporal_config=temporal_config,
)
)
except KeyboardInterrupt:
print("shutting down")


def run_reschedule(
probe_cc: List[str],
test_name: List[str],
clickhouse_url: str,
data_dir: str,
temporal_config: TemporalConfig,
delete: bool,
):
try:
asyncio.run(
create_schedules(
obs_params=obs_params,
analysis_params=analysis_params,
probe_cc=probe_cc,
test_name=test_name,
clickhouse_url=clickhouse_url,
data_dir=data_dir,
temporal_config=temporal_config,
delete=delete,
)
)
except KeyboardInterrupt:
Expand Down
Loading

0 comments on commit 7283f24

Please sign in to comment.