From a7b407d7787f570a8a5d6b51c12dbc9abf4648f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Fri, 24 Jan 2025 17:12:18 +0100 Subject: [PATCH] The {{ ts }} macro is resolved at runtime not during the compilation of the dag --- dags/pipeline.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/dags/pipeline.py b/dags/pipeline.py index 3c5c64cb..31051dcc 100644 --- a/dags/pipeline.py +++ b/dags/pipeline.py @@ -12,12 +12,17 @@ def run_make_observations( test_name: List[str], clickhouse_url: str, data_dir: str, - bucket_date: str, + bucket_date: str = "", + ts: str = "", ): from oonipipeline.tasks.observations import ( MakeObservationsParams, make_observations, ) + # We need to perform this transformation in here because the {{ ts }} + # parmaetrization is resolved at runtime + if bucket_date == "": + bucket_date = ts[:13] params = MakeObservationsParams( probe_cc=probe_cc, @@ -34,17 +39,23 @@ def run_make_analysis( clickhouse_url: str, probe_cc: List[str], test_name: List[str], - timestamp: str, + timestamp: str = "", + ts: str = "", ): from oonipipeline.tasks.analysis import ( MakeAnalysisParams, make_analysis, ) + # We need to perform this transformation in here because the {{ ts }} + # parmaetrization is resolved at runtime + if timestamp == "": + timestamp = ts[:13] + params = MakeAnalysisParams( probe_cc=probe_cc, test_name=test_name, - timestamp=timestamp, + timestamp=timestamp[:13], clickhouse_url=clickhouse_url, ) make_analysis(params) @@ -120,7 +131,6 @@ def run_make_analysis( max_active_runs=2, ) as dag_full: # YYYY-MM-DDTHH - start_hour = "{{ ts }}"[:13] op_make_observations_hourly = PythonVirtualenvOperator( task_id="make_observations_hourly", python_callable=run_make_observations, @@ -129,7 +139,7 @@ def run_make_analysis( "test_name": dag_full.params["test_name"], "clickhouse_url": Variable.get("clickhouse_url", default_var=""), "data_dir": Variable.get("data_dir", default_var=""), - "bucket_date": start_hour, + "ts": "{{ ts }}", }, requirements=REQUIREMENTS, system_site_packages=False, @@ -142,7 +152,7 @@ def run_make_analysis( "probe_cc": dag_full.params["probe_cc"], "test_name": dag_full.params["test_name"], "clickhouse_url": Variable.get("clickhouse_url", default_var=""), - "timestamp": start_hour, + "ts": "{{ ts }}", }, requirements=REQUIREMENTS, system_site_packages=False,