Skip to content

Commit

Permalink
Fix start and end time arithmetics
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Apr 25, 2024
1 parent 386b588 commit f7f463a
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
2 changes: 1 addition & 1 deletion oonipipeline/src/oonipipeline/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async def run_backfill(
ScheduleBackfill(
start_at=start_at,
end_at=end_at,
overlap=ScheduleOverlapPolicy.CANCEL_OTHER,
overlap=ScheduleOverlapPolicy.ALLOW_ALL,
),
)

Expand Down
8 changes: 6 additions & 2 deletions oonipipeline/src/oonipipeline/temporal/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ async def run(self, params: ObservationsWorkflowParams) -> dict:

# TODO(art): this is quite sketchy. Waiting on temporal slack question:
# https://temporalio.slack.com/archives/CTT84RS0P/p1714040382186429
bucket_date = "-".join(workflow_id.split("-")[-3:]).split("T")[0]
run_ts = datetime.strptime(
"-".join(workflow_id.split("-")[-3:]),
"%Y-%m-%dT%H:%M:%SZ",
)
bucket_date = (run_ts - timedelta(days=1)).strftime("%Y-%m-%d")

# read_time = workflow_info.start_time - timedelta(days=1)
# log.info(f"workflow.info().start_time={workflow.info().start_time} ")
Expand Down Expand Up @@ -195,7 +199,7 @@ async def schedule_observations(
ScheduleIntervalSpec(
every=timedelta(days=1), offset=timedelta(hours=2)
)
]
],
),
state=ScheduleState(
note="Run the observations workflow every day with an offset of 2 hours to ensure the files have been written to s3"
Expand Down
22 changes: 11 additions & 11 deletions oonipipeline/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@ def test_full_workflow(
)
assert result.exit_code == 0
# assert len(list(tmp_path.glob("*.warc.gz"))) == 1
import ipdb

ipdb.set_trace()
res = db.execute(
"SELECT bucket_date, COUNT(DISTINCT(measurement_uid)) FROM obs_web WHERE probe_cc = 'BA' GROUP BY bucket_date"
)
bucket_dict = dict(res[0])
assert bucket_dict["2022-10-20"] == 200
bucket_dict = dict(res)
assert "2022-10-20" in bucket_dict, bucket_dict
assert bucket_dict["2022-10-20"] == 200, bucket_dict

res = db.execute(
"SELECT COUNT() FROM obs_web WHERE bucket_date = '2022-10-20' AND probe_cc = 'BA'"
Expand Down Expand Up @@ -86,19 +84,21 @@ def test_full_workflow(
# Wait for the mutation to finish running
wait_for_mutations(db, "obs_web")
res = db.execute(
"SELECT COUNT() FROM obs_web WHERE bucket_date = '2022-10-20' AND probe_cc = 'BA'"
"SELECT bucket_date, COUNT(DISTINCT(measurement_uid)) FROM obs_web WHERE probe_cc = 'BA' GROUP BY bucket_date"
)
bucket_dict = dict(res)
assert "2022-10-20" in bucket_dict, bucket_dict
# By re-running it against the same date, we should still get the same observation count
assert res[0][0] == obs_count # type: ignore
assert bucket_dict["2022-10-20"] == obs_count, bucket_dict

result = cli_runner.invoke(
cli,
[
"mkgt",
"--start-day",
"2022-10-21",
"2022-10-20",
"--end-day",
"2022-10-22",
"2022-10-21",
"--data-dir",
datadir,
"--clickhouse",
Expand Down Expand Up @@ -126,9 +126,9 @@ def test_full_workflow(
"--probe-cc",
"BA",
"--start-day",
"2022-10-21",
"2022-10-20",
"--end-day",
"2022-10-22",
"2022-10-21",
"--test-name",
"web_connectivity",
"--data-dir",
Expand Down

0 comments on commit f7f463a

Please sign in to comment.