diff --git a/oonipipeline/Readme.md b/oonipipeline/Readme.md new file mode 100644 index 00000000..faf745b7 --- /dev/null +++ b/oonipipeline/Readme.md @@ -0,0 +1,44 @@ +# OONI Pipeline v5 + +This it the fifth major iteration of the OONI Data Pipeline. + +For historical context, these are the major revisions: +* `v0` - The "pipeline" is basically just writing the RAW json files into a public `www` directory. Used until ~2013 +* `v1` - OONI Pipeline based on custom CLI scripts using mongodb as a backend. Used until ~2015. +* `v2` - OONI Pipeline based on [luigi](https://luigi.readthedocs.io/en/stable/). Used until ~2017. +* `v3` - OONI Pipeline based on [airflow](https://airflow.apache.org/). Used until ~2020. +* `v4` - OONI Pipeline basedon custom script and systemd units (aka fastpath). Currently in use in production. +* `v5` - Next generation OONI Pipeline. What this readme is relevant to. Expected to become in production by Q4 2024. + +## Setup + +In order to run the pipeline you should setup the following dependencies: +* [Temporal for python](https://learn.temporal.io/getting_started/python/dev_environment/) +* [Clickhouse](https://clickhouse.com/docs/en/install) +* [hatch](https://hatch.pypa.io/1.9/install/) + + +### Quick start + +Start temporal dev server: +``` +temporal server start-dev +``` + +Start clickhouse server: +``` +mkdir -p clickhouse-data +clickhouse server +``` + +You should then start the workers by running: +``` +hatch run oonipipeline start-workers +``` + +You can then start the desired workflow, for example to create signal observations for the US: +``` +hatch run oonipipeline mkobs --probe-cc US --test-name signal --start-day 2024-01-01 --end-day 2024-01-02 +``` + +Monitor the workflow executing by accessing: http://localhost:8233/ diff --git a/oonipipeline/debug-temporal.sh b/oonipipeline/debug-temporal.sh deleted file mode 100644 index bbf43acc..00000000 --- a/oonipipeline/debug-temporal.sh +++ /dev/null @@ -1,12 +0,0 @@ -# >>> json.dumps(asdict(ObservationsWorkflowParams(probe_cc=["IT"], start_day="2024-01-01", end_day="2024-01-02", clickhouse="clickhouse://localhost/", data_dir="/Users/art/repos/ooni/data/tests/data/", parallelism=10, fast_fail=False, test_name=["signal"]))) -# -# -INPUT_JSON="{\"probe_cc\": [\"IT\"], \"test_name\": [\"signal\"], \"start_day\": \"2024-01-01\", \"end_day\": \"2024-01-20\", \"clickhouse\": \"clickhouse://localhost/\", \"data_dir\": \"$(pwd)/tests/data/datadir/\", \"parallelism\": 10, \"fast_fail\": false, \"log_level\": 20}" - -echo $INPUT_JSON -temporal workflow start \ - --task-queue oonidatapipeline-task-queue \ - --type ObservationsWorkflow \ - --namespace default \ - --input "$INPUT_JSON" - diff --git a/oonipipeline/pyproject.toml b/oonipipeline/pyproject.toml index c5ee396d..f6ae82a5 100644 --- a/oonipipeline/pyproject.toml +++ b/oonipipeline/pyproject.toml @@ -63,6 +63,7 @@ path = ".venv/" path = "src/oonipipeline/__about__.py" [tool.hatch.envs.default.scripts] +oonipipeline = "python -m oonipipeline.main {args}" test = "pytest {args:tests}" test-cov = "pytest -s --full-trace --log-level=INFO --log-cli-level=INFO -v --setup-show --cov=./ --cov-report=xml --cov-report=html --cov-report=term {args:tests}" cov-report = ["coverage report"] diff --git a/oonipipeline/src/oonipipeline/cli/__init__.py b/oonipipeline/src/oonipipeline/cli/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/oonipipeline/src/oonipipeline/cli/commands.py b/oonipipeline/src/oonipipeline/cli/commands.py new file mode 100644 index 00000000..c4557191 --- /dev/null +++ b/oonipipeline/src/oonipipeline/cli/commands.py @@ -0,0 +1,394 @@ +import logging +import multiprocessing +from pathlib import Path +import sys +from typing import List, Optional +from datetime import date, timedelta, datetime +from typing import List, Optional + +import click +from click_loglevel import LogLevel + +from ..__about__ import VERSION +from ..db.connections import ClickhouseConnection +from ..db.create_tables import create_queries, list_all_table_diffs +from ..netinfo import NetinfoDB + +log = logging.getLogger("oonidata") + +import asyncio + +import concurrent.futures + +from temporalio.client import Client +from temporalio.worker import Worker +from temporalio.types import MethodAsyncSingleParam, SelfType, ParamType, ReturnType +from ..workflows.observations import ObservationsWorkflow, ObservationsWorkflowParams +from ..workflows.observations import make_observation_in_day + +TASK_QUEUE_NAME = "oonipipeline-task-queue" + + +async def run_workflow( + workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType +): + client = await Client.connect("localhost:7233") + + await client.execute_workflow( + workflow, + arg, + id=TASK_QUEUE_NAME, + task_queue=TASK_QUEUE_NAME, + ) + + +def _parse_csv(ctx, param, s: Optional[str]) -> List[str]: + if s: + return s.split(",") + return [] + + +probe_cc_option = click.option( + "--probe-cc", + callback=_parse_csv, + help="two letter country code, can be comma separated for a list (eg. IT,US). If omitted will select process all countries.", +) +test_name_option = click.option( + "--test-name", + type=str, + callback=_parse_csv, + help="test_name you care to process, can be comma separated for a list (eg. web_connectivity,whatsapp). If omitted will select process all test names.", +) +start_day_option = click.option( + "--start-day", + default=(date.today() - timedelta(days=14)).strftime("%Y-%m-%d"), + help="""the timestamp of the day for which we should start processing data (inclusive). + + Note: this is the upload date, which doesn't necessarily match the measurement date. + """, +) +end_day_option = click.option( + "--end-day", + default=(date.today() + timedelta(days=1)).strftime("%Y-%m-%d"), + help="""the timestamp of the day for which we should start processing data (inclusive). + + Note: this is the upload date, which doesn't necessarily match the measurement date. + """, +) + +clickhouse_option = click.option( + "--clickhouse", type=str, required=True, default="clickhouse://localhost" +) + +datadir_option = click.option( + "--data-dir", + type=str, + required=True, + default="tests/data/datadir", + help="data directory to store fingerprint and geoip databases", +) + + +@click.group() +@click.option("--error-log-file", type=Path) +@click.option( + "-l", + "--log-level", + type=LogLevel(), + default="INFO", + help="Set logging level", + show_default=True, +) +@click.version_option(VERSION) +def cli(error_log_file: Path, log_level: int): + log.addHandler(logging.StreamHandler(sys.stderr)) + log.setLevel(log_level) + if error_log_file: + logging.basicConfig( + filename=error_log_file, encoding="utf-8", level=logging.ERROR + ) + + +@cli.command() +@probe_cc_option +@test_name_option +@start_day_option +@end_day_option +@clickhouse_option +@datadir_option +@click.option( + "--parallelism", + type=int, + default=multiprocessing.cpu_count() + 2, + help="number of processes to use. Only works when writing to a database", +) +@click.option( + "--fast-fail", + is_flag=True, + help="should we fail immediately when we encounter an error?", +) +@click.option( + "--create-tables", + is_flag=True, + help="should we attempt to create the required clickhouse tables", +) +@click.option( + "--drop-tables", + is_flag=True, + help="should we drop tables before creating them", +) +def mkobs( + probe_cc: List[str], + test_name: List[str], + start_day: str, + end_day: str, + clickhouse: str, + data_dir: str, + parallelism: int, + fast_fail: bool, + create_tables: bool, + drop_tables: bool, +): + """ + Make observations for OONI measurements and write them into clickhouse or a CSV file + """ + if create_tables: + if drop_tables: + click.confirm( + "Are you sure you want to drop the tables before creation?", abort=True + ) + + with ClickhouseConnection(clickhouse) as db: + for query, table_name in create_queries: + if drop_tables: + db.execute(f"DROP TABLE IF EXISTS {table_name};") + db.execute(query) + + click.echo("Starting to process observations") + NetinfoDB(datadir=Path(data_dir), download=True) + click.echo("downloaded netinfodb") + + arg = ObservationsWorkflowParams( + probe_cc=probe_cc, + test_name=test_name, + start_day=start_day, + end_day=end_day, + clickhouse=clickhouse, + data_dir=str(data_dir), + parallelism=parallelism, + fast_fail=fast_fail, + ) + click.echo(f"starting to make observations with arg={arg}") + asyncio.run( + run_workflow( + ObservationsWorkflow.run, + arg, + ) + ) + + +@cli.command() +@probe_cc_option +@test_name_option +@start_day_option +@end_day_option +@clickhouse_option +@datadir_option +@click.option( + "--parallelism", + type=int, + default=multiprocessing.cpu_count() + 2, + help="number of processes to use. Only works when writing to a database", +) +@click.option( + "--fast-fail", + is_flag=True, + help="should we fail immediately when we encounter an error?", +) +@click.option( + "--create-tables", + is_flag=True, + help="should we attempt to create the required clickhouse tables", +) +@click.option( + "--rebuild-ground-truths", + is_flag=True, + help="should we force the rebuilding of ground truths", +) +def mkanalysis( + probe_cc: List[str], + test_name: List[str], + start_day: date, + end_day: date, + clickhouse: str, + data_dir: Path, + parallelism: int, + fast_fail: bool, + create_tables: bool, + rebuild_ground_truths: bool, +): + if create_tables: + with ClickhouseConnection(clickhouse) as db: + for query, table_name in create_queries: + click.echo(f"Running create query for {table_name}") + db.execute(query) + + # start_analysis( + # probe_cc=probe_cc, + # test_name=test_name, + # start_day=start_day, + # end_day=end_day, + # clickhouse=clickhouse, + # data_dir=data_dir, + # parallelism=parallelism, + # fast_fail=fast_fail, + # rebuild_ground_truths=rebuild_ground_truths, + # ) + raise NotImplemented("TODO(art)") + + +@cli.command() +@start_day_option +@end_day_option +@clickhouse_option +@datadir_option +@click.option( + "--parallelism", + type=int, + default=multiprocessing.cpu_count() + 2, + help="number of processes to use. Only works when writing to a database", +) +def mkgt( + start_day: date, + end_day: date, + clickhouse: str, + data_dir: Path, + parallelism: int, +): + # start_ground_truth_builder( + # start_day=start_day, + # end_day=end_day, + # clickhouse=clickhouse, + # data_dir=data_dir, + # parallelism=parallelism, + # ) + raise NotImplemented("TODO(art)") + + +@cli.command() +@probe_cc_option +@test_name_option +@start_day_option +@end_day_option +@clickhouse_option +@datadir_option +@click.option("--archives-dir", type=Path, required=True) +@click.option( + "--parallelism", + type=int, + default=multiprocessing.cpu_count() + 2, + help="number of processes to use. Only works when writing to a database", +) +def mkbodies( + probe_cc: List[str], + test_name: List[str], + start_day: date, + end_day: date, + clickhouse: str, + data_dir: Path, + archives_dir: Path, + parallelism: int, +): + """ + Make response body archives + """ + # start_response_archiver( + # probe_cc=probe_cc, + # test_name=test_name, + # start_day=start_day, + # end_day=end_day, + # data_dir=data_dir, + # archives_dir=archives_dir, + # clickhouse=clickhouse, + # parallelism=parallelism, + # ) + raise NotImplemented("TODO(art)") + + +@cli.command() +@datadir_option +@click.option("--archives-dir", type=Path, required=True) +@click.option( + "--parallelism", + type=int, + default=multiprocessing.cpu_count() + 2, + help="number of processes to use", +) +def fphunt(data_dir: Path, archives_dir: Path, parallelism: int): + click.echo("🏹 starting the hunt for blockpage fingerprints!") + # start_fingerprint_hunter( + # archives_dir=archives_dir, + # data_dir=data_dir, + # parallelism=parallelism, + # ) + raise NotImplemented("TODO(art)") + + +@cli.command() +@click.option("--clickhouse", type=str) +@click.option( + "--create-tables", + is_flag=True, + help="should we attempt to create the required clickhouse tables", +) +@click.option( + "--drop-tables", + is_flag=True, + help="should we drop tables before creating them", +) +def checkdb( + clickhouse: Optional[str], + create_tables: bool, + drop_tables: bool, +): + """ + Check if the database tables require migrations. If the create-tables flag + is not specified, it will not perform any operations. + """ + + if create_tables: + if not clickhouse: + click.echo("--clickhouse needs to be specified when creating tables") + return 1 + if drop_tables: + click.confirm( + "Are you sure you want to drop the tables before creation?", abort=True + ) + + with ClickhouseConnection(clickhouse) as db: + for query, table_name in create_queries: + if drop_tables: + db.execute(f"DROP TABLE IF EXISTS {table_name};") + db.execute(query) + + with ClickhouseConnection(clickhouse) as db: + list_all_table_diffs(db) + + +@cli.command() +def start_workers(): + async def run(): + client = await Client.connect("localhost:7233") + with concurrent.futures.ThreadPoolExecutor( + max_workers=100 + ) as activity_executor: + worker = Worker( + client, + task_queue=TASK_QUEUE_NAME, + workflows=[ObservationsWorkflow], + activities=[make_observation_in_day], + activity_executor=activity_executor, + ) + await worker.run() + + asyncio.run(run()) diff --git a/oonipipeline/src/oonipipeline/main.py b/oonipipeline/src/oonipipeline/main.py index ed0e4cfc..5d678bab 100644 --- a/oonipipeline/src/oonipipeline/main.py +++ b/oonipipeline/src/oonipipeline/main.py @@ -1,31 +1,4 @@ -import asyncio - -import concurrent.futures - -from temporalio.client import Client -from temporalio.worker import Worker - -from .workflows.observations import ObservationsWorkflow -from .workflows.observations import make_observation_in_day - - -async def async_main(): - client = await Client.connect("localhost:7233") - with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor: - worker = Worker( - client, - task_queue="oonipipeline-task-queue", - workflows=[ObservationsWorkflow], - activities=[make_observation_in_day], - activity_executor=activity_executor, - ) - - await worker.run() - - -def main(): - asyncio.run(async_main()) - +from .cli.commands import cli if __name__ == "__main__": - main() + cli()