Skip to content

Commit

Permalink
The {{ ts }} macro is resolved at runtime not during the compilation …
Browse files Browse the repository at this point in the history
…of the dag
  • Loading branch information
hellais committed Jan 24, 2025
1 parent 510f2b9 commit a7b407d
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions dags/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@ def run_make_observations(
test_name: List[str],
clickhouse_url: str,
data_dir: str,
bucket_date: str,
bucket_date: str = "",
ts: str = "",
):
from oonipipeline.tasks.observations import (
MakeObservationsParams,
make_observations,
)
# We need to perform this transformation in here because the {{ ts }}
# parmaetrization is resolved at runtime
if bucket_date == "":
bucket_date = ts[:13]

params = MakeObservationsParams(
probe_cc=probe_cc,
Expand All @@ -34,17 +39,23 @@ def run_make_analysis(
clickhouse_url: str,
probe_cc: List[str],
test_name: List[str],
timestamp: str,
timestamp: str = "",
ts: str = "",
):
from oonipipeline.tasks.analysis import (
MakeAnalysisParams,
make_analysis,
)

# We need to perform this transformation in here because the {{ ts }}
# parmaetrization is resolved at runtime
if timestamp == "":
timestamp = ts[:13]

params = MakeAnalysisParams(
probe_cc=probe_cc,
test_name=test_name,
timestamp=timestamp,
timestamp=timestamp[:13],
clickhouse_url=clickhouse_url,
)
make_analysis(params)
Expand Down Expand Up @@ -120,7 +131,6 @@ def run_make_analysis(
max_active_runs=2,
) as dag_full:
# YYYY-MM-DDTHH
start_hour = "{{ ts }}"[:13]
op_make_observations_hourly = PythonVirtualenvOperator(
task_id="make_observations_hourly",
python_callable=run_make_observations,
Expand All @@ -129,7 +139,7 @@ def run_make_analysis(
"test_name": dag_full.params["test_name"],
"clickhouse_url": Variable.get("clickhouse_url", default_var=""),
"data_dir": Variable.get("data_dir", default_var=""),
"bucket_date": start_hour,
"ts": "{{ ts }}",
},
requirements=REQUIREMENTS,
system_site_packages=False,
Expand All @@ -142,7 +152,7 @@ def run_make_analysis(
"probe_cc": dag_full.params["probe_cc"],
"test_name": dag_full.params["test_name"],
"clickhouse_url": Variable.get("clickhouse_url", default_var=""),
"timestamp": start_hour,
"ts": "{{ ts }}",
},
requirements=REQUIREMENTS,
system_site_packages=False,
Expand Down

0 comments on commit a7b407d

Please sign in to comment.